Update levin async callbacks for C++14

This commit is contained in:
Lee *!* Clagett 2022-08-29 15:03:42 -04:00
parent c9cfa25183
commit a263cae14a
6 changed files with 93 additions and 117 deletions

View file

@ -379,7 +379,7 @@ namespace net_utils
try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support);
bool connect(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_connection_context& cn, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect);
template<class t_callback>
bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, const t_callback &cb, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect);
bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_callback &&cb, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect);
boost::asio::ssl::context& get_ssl_context() noexcept
{

View file

@ -1786,7 +1786,7 @@ namespace net_utils
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_callback>
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support)
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback &&cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support)
{
TRY_ENTRY();
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, ssl_support) );
@ -1879,7 +1879,7 @@ namespace net_utils
}
});
//start async connect
sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_)
sock_.async_connect(remote_endpoint, [=, cb = std::forward<t_callback>(cb)](const boost::system::error_code& ec_) mutable
{
t_connection_context conn_context = AUTO_VAL_INIT(conn_context);
boost::system::error_code ignored_ec;

View file

@ -104,7 +104,7 @@ public:
int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t>
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, callback_t &&cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
@ -190,105 +190,90 @@ public:
struct invoke_response_handler_base
{
virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
virtual bool is_timer_started() const=0;
virtual void cancel()=0;
virtual bool cancel_timer()=0;
virtual void reset_timer()=0;
};
template <class callback_t>
struct anvoke_handler: invoke_response_handler_base
{
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
:m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
{
if(m_con.start_outer_call())
{
MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
});
m_timer_started = true;
}
}
virtual ~anvoke_handler()
{}
callback_t m_cb;
async_protocol_handler& m_con;
protected:
boost::asio::deadline_timer m_timer;
bool m_timer_started;
bool m_cancel_timer_called;
bool m_timer_cancelled;
uint64_t m_timeout;
int m_command;
virtual bool handle(int res, const epee::span<const uint8_t> buff, typename async_protocol_handler::connection_context& context)
async_protocol_handler& m_con;
const uint64_t m_timeout;
const int m_command;
virtual void do_handle(int res, const epee::span<const uint8_t> buff)=0;
public:
invoke_response_handler_base(uint64_t timeout, async_protocol_handler& con, int command)
: m_timer(con.m_pservice_endpoint->get_io_service()),
m_con(con),
m_timeout(timeout),
m_command(command)
{}
virtual ~invoke_response_handler_base() {}
//! Run user handler. Only call if `cancel_timer()` returns true.
void handle(int res, const epee::span<const uint8_t> buff)
{
if(!cancel_timer())
return false;
m_cb(res, buff, context);
do_handle(res, buff);
m_con.finish_outer_call();
}
//! Cancel timer and run user handler if not previously invoked \return True if user handler invoked
bool cancel()
{
if (!cancel_timer())
return false;
handle(LEVIN_ERROR_CONNECTION_DESTROYED, nullptr);
return true;
}
virtual bool is_timer_started() const
//! Cancel timer but do not run user handler. \return True if user handler has not been invoked
bool cancel_timer()
{
return m_timer_started;
boost::system::error_code ignored{};
return m_timer.cancel(ignored);
}
virtual void cancel()
// Attempt to adjust timeout for user handler execution. \return True if timer adjusted.
static bool reset_timer(const std::shared_ptr<invoke_response_handler_base>& self, const bool initial = false)
{
if(cancel_timer())
if (!self || (!initial && !self->cancel_timer()))
return false;
if (initial && !self->m_con.start_outer_call())
return false;
self->m_timer.expires_from_now(boost::posix_time::milliseconds(self->m_timeout));
self->m_timer.async_wait([self](const boost::system::error_code& ec)
{
epee::span<const uint8_t> fake;
m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
m_con.finish_outer_call();
}
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(self->m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << self->m_command << " timeout: " << self->m_timeout);
self->do_handle(LEVIN_ERROR_CONNECTION_TIMEDOUT, nullptr); // delay finish_outer_call
self->m_con.close();
self->m_con.finish_outer_call();
});
return true;
}
virtual bool cancel_timer()
};
template <class callback_t>
struct anvoke_handler final : invoke_response_handler_base
{
template<typename F>
anvoke_handler(F&& cb, uint64_t timeout, async_protocol_handler& con, int command)
: invoke_response_handler_base(timeout, con, command), m_cb(std::forward<F>(cb))
{}
virtual ~anvoke_handler() override final
{}
private:
callback_t m_cb;
virtual void do_handle(int res, const epee::span<const uint8_t> buff) override final
{
if(!m_cancel_timer_called)
{
m_cancel_timer_called = true;
boost::system::error_code ignored_ec;
m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
}
return m_timer_cancelled;
}
virtual void reset_timer()
{
boost::system::error_code ignored_ec;
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
{
callback_t& cb = m_cb;
uint64_t timeout = m_timeout;
async_protocol_handler& con = m_con;
int command = m_command;
m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
});
}
m_cb(res, buff, this->m_con.get_context_ref());
}
};
critical_section m_invoke_response_handlers_lock;
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
std::deque<std::shared_ptr<invoke_response_handler_base>> m_invoke_response_handlers;
template<class callback_t>
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
bool add_invoke_response_handler(callback_t &&cb, uint64_t timeout, async_protocol_handler& con, int command)
{
CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock);
if (m_protocol_released)
@ -296,9 +281,10 @@ public:
MERROR("Adding response handler to a released object");
return false;
}
boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
m_invoke_response_handlers.push_back(handler);
return handler->is_timer_started();
m_invoke_response_handlers.push_back(
std::make_shared<anvoke_handler<std::decay_t<callback_t>>>(std::forward<callback_t>(cb), timeout, con, command)
);
return invoke_response_handler_base::reset_timer(m_invoke_response_handlers.back(), true);
}
template<class callback_t> friend struct anvoke_handler;
public:
@ -372,9 +358,8 @@ public:
// Never call callback inside critical section, that can cause deadlock. Callback can be called when
// invoke_response_handler_base is cancelled
std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
for (const std::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr : local_invoke_response_handlers)
pinv_resp_hndlr->cancel();
});
return true;
}
@ -447,8 +432,7 @@ public:
if (!m_invoke_response_handlers.empty())
{
//async call scenario
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
response_handler->reset_timer();
invoke_response_handler_base::reset_timer(m_invoke_response_handlers.front());
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb << ", current total " << m_cache_in_buffer.size() << "/" << m_current_head.m_cb << " (" << (100.0f * m_cache_in_buffer.size() / (m_current_head.m_cb ? m_current_head.m_cb : 1)) << "%)");
}
}
@ -509,7 +493,7 @@ public:
epee::critical_region_t<decltype(m_invoke_response_handlers_lock)> invoke_response_handlers_guard(m_invoke_response_handlers_lock);
if(!m_invoke_response_handlers.empty())
{//async call scenario
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
const std::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
bool timer_cancelled = response_handler->cancel_timer();
// Don't pop handler, to avoid destroying it
if(timer_cancelled)
@ -517,7 +501,7 @@ public:
invoke_response_handlers_guard.unlock();
if(timer_cancelled)
response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
response_handler->handle(m_current_head.m_return_code, buff_to_invoke);
}
else
{
@ -628,7 +612,7 @@ public:
}
template<class callback_t>
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke(int command, message_writer in_msg, callback_t &&cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -654,7 +638,7 @@ public:
break;
}
if(!add_invoke_response_handler(cb, timeout, *this, command))
if(!add_invoke_response_handler(std::forward<callback_t>(cb), timeout, *this, command))
{
err_code = LEVIN_ERROR_CONNECTION_DESTROYED;
break;
@ -836,11 +820,11 @@ int async_protocol_handler_config<t_connection_context>::invoke(int command, mes
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, callback_t &&cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), std::forward<callback_t>(cb), timeout) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>

View file

@ -84,14 +84,14 @@ namespace epee
}
template<class t_result, class t_arg, class callback_t, class t_transport>
bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, callback_t &&cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg;
const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation
levin::message_writer to_send{16 * 1024};
stg.store_to_binary(to_send.buffer);
int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb = std::forward<callback_t>(cb), command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
{
t_result result_struct = AUTO_VAL_INIT(result_struct);
if( code <=0 )

View file

@ -385,7 +385,7 @@ namespace nodetool
void delete_upnp_port_mapping_v6(uint32_t port);
void delete_upnp_port_mapping(uint32_t port);
template<class t_callback>
bool try_ping(basic_node_data& node_data, p2p_connection_context& context, const t_callback &cb);
bool try_ping(basic_node_data& node_data, p2p_connection_context& context, t_callback &&cb);
bool try_get_support_flags(const p2p_connection_context& context, std::function<void(p2p_connection_context&, const uint32_t&)> f);
bool make_expected_connections_count(network_zone& zone, PeerType peer_type, size_t expected_connections);
void record_addr_failed(const epee::net_utils::network_address& addr);

View file

@ -2321,7 +2321,7 @@ namespace nodetool
}
//-----------------------------------------------------------------------------------
template<class t_payload_net_handler> template<class t_callback>
bool node_server<t_payload_net_handler>::try_ping(basic_node_data& node_data, p2p_connection_context& context, const t_callback &cb)
bool node_server<t_payload_net_handler>::try_ping(basic_node_data& node_data, p2p_connection_context& context, t_callback &&cb)
{
if(!node_data.my_port)
return false;
@ -2364,9 +2364,9 @@ namespace nodetool
address = epee::net_utils::network_address{epee::net_utils::ipv6_network_address(ipv6_addr, node_data.my_port)};
}
peerid_type pr = node_data.peer_id;
bool r = zone.m_net_server.connect_async(ip, port, zone.m_config.m_net_config.ping_connection_timeout, [cb, /*context,*/ address, pr, this](
bool r = zone.m_net_server.connect_async(ip, port, zone.m_config.m_net_config.ping_connection_timeout, [cb = std::forward<t_callback>(cb), /*context,*/ address, pr, this] (
const typename net_server::t_connection_context& ping_context,
const boost::system::error_code& ec)->bool
const boost::system::error_code& ec) mutable ->bool
{
if(ec)
{
@ -2375,19 +2375,11 @@ namespace nodetool
}
COMMAND_PING::request req;
COMMAND_PING::response rsp;
//vc2010 workaround
/*std::string ip_ = ip;
std::string port_=port;
peerid_type pr_ = pr;
auto cb_ = cb;*/
// GCC 5.1.0 gives error with second use of uint64_t (peerid_type) variable.
peerid_type pr_ = pr;
network_zone& zone = m_network_zones.at(address.get_zone());
bool inv_call_res = epee::net_utils::async_invoke_remote_command2<COMMAND_PING::response>(ping_context, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(),
[=](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context)
[this, pr, cb = std::move(cb), ping_context = std::move(ping_context), address = std::move(address)](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context)
{
if(code <= 0)
{
@ -2398,7 +2390,7 @@ namespace nodetool
network_zone& zone = m_network_zones.at(address.get_zone());
if(rsp.status != PING_OK_RESPONSE_STATUS_TEXT || pr != rsp.peer_id)
{
LOG_WARNING_CC(ping_context, "back ping invoke wrong response \"" << rsp.status << "\" from" << address.str() << ", hsh_peer_id=" << pr_ << ", rsp.peer_id=" << peerid_to_string(rsp.peer_id));
LOG_WARNING_CC(ping_context, "back ping invoke wrong response \"" << rsp.status << "\" from" << address.str() << ", hsh_peer_id=" << pr << ", rsp.peer_id=" << peerid_to_string(rsp.peer_id));
zone.m_net_server.get_config_object().close(ping_context.m_connection_id);
return;
}
@ -2434,7 +2426,7 @@ namespace nodetool
COMMAND_REQUEST_SUPPORT_FLAGS::ID,
support_flags_request,
m_network_zones.at(epee::net_utils::zone::public_).m_net_server.get_config_object(),
[=](int code, const typename COMMAND_REQUEST_SUPPORT_FLAGS::response& rsp, p2p_connection_context& context_)
[f = std::move(f)](int code, const typename COMMAND_REQUEST_SUPPORT_FLAGS::response& rsp, p2p_connection_context& context_)
{
if(code < 0)
{