/** @file @author from CrypoNote (see copyright below; Andrey N. Sabelnikov) @monero rfree @brief the connection templated-class for one peer connection */ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // #include <boost/bind.hpp> #include <boost/foreach.hpp> #include <boost/uuid/random_generator.hpp> #include <boost/chrono.hpp> #include <boost/utility/value_init.hpp> #include <boost/asio/deadline_timer.hpp> #include <boost/date_time/posix_time/posix_time.hpp> // TODO #include <boost/thread/condition_variable.hpp> // TODO #include <boost/make_shared.hpp> #include "warnings.h" #include "string_tools.h" #include "misc_language.h" #include "net/local_ip.h" #include "pragma_comp_defs.h" #include <sstream> #include <iomanip> #include <algorithm> #include <functional> #include <random> #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" #define AGGRESSIVE_TIMEOUT_THRESHOLD 120 // sockets #define NEW_CONNECTION_TIMEOUT_LOCAL 1200000 // 2 minutes #define NEW_CONNECTION_TIMEOUT_REMOTE 10000 // 10 seconds #define DEFAULT_TIMEOUT_MS_LOCAL 1800000 // 30 minutes #define DEFAULT_TIMEOUT_MS_REMOTE 300000 // 5 minutes #define TIMEOUT_EXTRA_MS_PER_BYTE 0.2 PRAGMA_WARNING_PUSH namespace epee { namespace net_utils { template<typename T> T& check_and_get(std::shared_ptr<T>& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; } /************************************************************************/ /* */ /************************************************************************/ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection(boost::asio::ip::tcp::socket{io_service}, std::move(state), connection_type, ssl_support) { } template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), m_throttle_speed_out("speed_out", "throttle_speed_out"), m_timer(GET_IO_SERVICE(socket_)), m_local(false), m_ready_to_close(false) { MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type); } PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template<class t_protocol_handler> connection<t_protocol_handler>::~connection() noexcept(false) { if(!m_was_shutdown) { _dbg3("[sock " << socket().native_handle() << "] Socket destroyed without shutdown."); shutdown(); } _dbg3("[sock " << socket().native_handle() << "] Socket destroyed"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> boost::shared_ptr<connection<t_protocol_handler> > connection<t_protocol_handler>::safe_shared_from_this() { try { return connection<t_protocol_handler>::shared_from_this(); } catch (const boost::bad_weak_ptr&) { // It happens when the connection is being deleted return boost::shared_ptr<connection<t_protocol_handler> >(); } } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::start(bool is_income, bool is_multithreaded) { TRY_ENTRY(); boost::system::error_code ec; auto remote_ep = socket().remote_endpoint(ec); CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value()); CHECK_AND_NO_ASSERT_MES(remote_ep.address().is_v4() || remote_ep.address().is_v6(), false, "only IPv4 and IPv6 supported here"); if (remote_ep.address().is_v4()) { const unsigned long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); return start(is_income, is_multithreaded, ipv4_network_address{uint32_t(ip_), remote_ep.port()}); } else { const auto ip_ = remote_ep.address().to_v6(); return start(is_income, is_multithreaded, ipv6_network_address{ip_, remote_ep.port()}); } CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::start(bool is_income, bool is_multithreaded, network_address real_remote) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) return false; m_is_multithreaded = is_multithreaded; m_local = real_remote.is_loopback() || real_remote.is_local(); // create a random uuid, we don't need crypto strength here const boost::uuids::uuid random_uuid = boost::uuids::random_generator()(); context = t_connection_context{}; bool ssl = m_ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled; context.set_details(random_uuid, std::move(real_remote), is_income, ssl); boost::system::error_code ec; auto local_ep = socket().local_endpoint(ec); CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value()); _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << " to " << local_ep.address().to_string() << ':' << local_ep.port() << ", total sockets objects " << get_state().sock_count); if(static_cast<shared_state&>(get_state()).pfilter && !static_cast<shared_state&>(get_state()).pfilter->is_remote_host_allowed(context.m_remote_address)) { _dbg2("[sock " << socket().native_handle() << "] host denied " << context.m_remote_address.host_str() << ", shutdowning connection"); close(); return false; } m_host = context.m_remote_address.host_str(); try { host_count(m_host, 1); } catch(...) { /* ignore */ } m_protocol_handler.after_init_connection(); reset_timer(boost::posix_time::milliseconds(m_local ? NEW_CONNECTION_TIMEOUT_LOCAL : NEW_CONNECTION_TIMEOUT_REMOTE), false); // first read on the raw socket to detect SSL for the server buffer_ssl_init_fill = 0; if (is_income && m_ssl_support != epee::net_utils::ssl_support_t::e_ssl_support_disabled) socket().async_receive(boost::asio::buffer(buffer_), boost::asio::socket_base::message_peek, strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_receive, self, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); else async_read_some(boost::asio::buffer(buffer_), strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_read, self, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); #if !defined(_WIN32) || !defined(__i686) // not supported before Windows7, too lazy for runtime check // Just exclude for 32bit windows builds //set ToS flag int tos = get_tos_flag(); boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS > optionTos( tos ); socket().set_option( optionTos ); //_dbg1("Set ToS flag to " << tos); #endif boost::asio::ip::tcp::no_delay noDelayOption(false); socket().set_option(noDelayOption); return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::request_callback() { TRY_ENTRY(); _dbg2("[" << print_connection_context_short(context) << "] request_callback"); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) return false; strand_.post(boost::bind(&connection<t_protocol_handler>::call_back_starter, self)); CATCH_ENTRY_L0("connection<t_protocol_handler>::request_callback()", false); return true; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> boost::asio::io_service& connection<t_protocol_handler>::get_io_service() { return GET_IO_SERVICE(socket()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::add_ref() { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) return false; //_dbg3("[sock " << socket().native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number); CRITICAL_REGION_LOCAL(self->m_self_refs_lock); //_dbg3("[sock " << socket().native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number); if(m_was_shutdown) return false; ++m_reference_count; m_self_ref = std::move(self); return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::add_ref()", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::release() { TRY_ENTRY(); boost::shared_ptr<connection<t_protocol_handler> > back_connection_copy; LOG_TRACE_CC(context, "[sock " << socket().native_handle() << "] release"); CRITICAL_REGION_BEGIN(m_self_refs_lock); CHECK_AND_ASSERT_MES(m_reference_count, false, "[sock " << socket().native_handle() << "] m_reference_count already at 0 at connection<t_protocol_handler>::release() call"); // is this the last reference? if (--m_reference_count == 0) { // move the held reference to a local variable, keeping the object alive until the function terminates std::swap(back_connection_copy, m_self_ref); } CRITICAL_REGION_END(); return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::release()", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::call_back_starter() { TRY_ENTRY(); _dbg2("[" << print_connection_context_short(context) << "] fired_callback"); m_protocol_handler.handle_qued_callback(); CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::save_dbg_log() { std::string address, port; boost::system::error_code e; boost::asio::ip::tcp::endpoint endpoint = socket().remote_endpoint(e); if (e) { address = "<not connected>"; port = "<not connected>"; } else { address = endpoint.address().to_string(); port = boost::lexical_cast<std::string>(endpoint.port()); } MDEBUG(" connection type " << to_string( m_connection_type ) << " " << socket().local_endpoint().address().to_string() << ":" << socket().local_endpoint().port() << " <--> " << context.m_remote_address.str() << " (via " << address << ":" << port << ")"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::handle_read(const boost::system::error_code& e, std::size_t bytes_transferred) { TRY_ENTRY(); //_info("[sock " << socket().native_handle() << "] Async read calledback."); if (!e) { double current_speed_down; { CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex); m_throttle_speed_in.handle_trafic_exact(bytes_transferred); current_speed_down = m_throttle_speed_in.get_current_speed(); } context.m_current_speed_down = current_speed_down; context.m_max_speed_down = std::max(context.m_max_speed_down, current_speed_down); { CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in ); epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred); } double delay=0; // will be calculated - how much we should sleep to obey speed limit etc if (speed_limit_is_enabled()) { do // keep sleeping if we should sleep { { //_scope_dbg1("CRITICAL_REGION_LOCAL"); CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); } delay *= 0.5; if (delay > 0) { long int ms = (long int)(delay * 100); reset_timer(boost::posix_time::milliseconds(ms + 1), true); boost::this_thread::sleep_for(boost::chrono::milliseconds(ms)); } } while(delay > 0); } // any form of sleeping //_info("[sock " << socket().native_handle() << "] RECV " << bytes_transferred); logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); context.m_recv_cnt += bytes_transferred; m_ready_to_close = false; bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(!m_send_que.size()) do_shutdown = true; CRITICAL_REGION_END(); if(do_shutdown) shutdown(); }else { reset_timer(get_timeout_from_bytes_read(bytes_transferred), false); async_read_some(boost::asio::buffer(buffer_), strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); //_info("[sock " << socket().native_handle() << "]Async read requested."); } }else { _dbg3("[sock " << socket().native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); if(e.value() != 2) { _dbg3("[sock " << socket().native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); shutdown(); } else { _dbg3("[sock " << socket().native_handle() << "] peer closed connection"); bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(!m_send_que.size()) do_shutdown = true; CRITICAL_REGION_END(); if (m_ready_to_close || do_shutdown) shutdown(); } m_ready_to_close = true; } // If an error occurs then no new asynchronous operations are started. This // means that all shared_ptr references to the connection object will // disappear and the object will be destroyed automatically after this // handler returns. The connection class's destructor closes the socket. CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_read", void()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::handle_receive(const boost::system::error_code& e, std::size_t bytes_transferred) { TRY_ENTRY(); if (e) { // offload the error case handle_read(e, bytes_transferred); return; } reset_timer(get_timeout_from_bytes_read(bytes_transferred), false); buffer_ssl_init_fill += bytes_transferred; if (buffer_ssl_init_fill <= get_ssl_magic_size()) { socket().async_receive(boost::asio::buffer(buffer_.data() + buffer_ssl_init_fill, buffer_.size() - buffer_ssl_init_fill), boost::asio::socket_base::message_peek, strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_receive, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); return; } // detect SSL if (m_ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (is_ssl((const unsigned char*)buffer_.data(), buffer_ssl_init_fill)) { MDEBUG("That looks like SSL"); m_ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_enabled; // read/write to the SSL socket } else { MDEBUG("That does not look like SSL"); m_ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; // read/write to the raw socket } } if (m_ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled) { // Handshake if (!handshake(boost::asio::ssl::stream_base::server)) { MERROR("SSL handshake failed"); boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); m_ready_to_close = true; bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(!m_send_que.size()) do_shutdown = true; CRITICAL_REGION_END(); if(do_shutdown) shutdown(); return; } } async_read_some(boost::asio::buffer(buffer_), strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); // If an error occurs then no new asynchronous operations are started. This // means that all shared_ptr references to the connection object will // disappear and the object will be destroyed automatically after this // handler returns. The connection class's destructor closes the socket. CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_receive", void()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::call_run_once_service_io() { TRY_ENTRY(); if(!m_is_multithreaded) { //single thread model, we can wait in blocked call size_t cnt = GET_IO_SERVICE(socket()).run_one(); if(!cnt)//service is going to quit return false; }else { //multi thread model, we can't(!) wait in blocked call //so we make non blocking call and releasing CPU by calling sleep(0); //if no handlers were called //TODO: Maybe we need to have have critical section + event + callback to upper protocol to //ask it inside(!) critical region if we still able to go in event wait... size_t cnt = GET_IO_SERVICE(socket()).poll_one(); if(!cnt) misc_utils::sleep_no_w(1); } return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::do_send(byte_slice message) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if (!self) return false; if (m_was_shutdown) return false; // TODO avoid copy std::uint8_t const* const message_data = message.data(); const std::size_t message_size = message.size(); const double factor = 32; // TODO config typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); const t_safe chunksize_max = chunksize_good * 2 ; const bool allow_split = (m_connection_type == e_connection_type_RPC) ? false : true; // do not split RPC data CHECK_AND_ASSERT_MES(! (chunksize_max<0), false, "Negative chunksize_max" ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; if (allow_split && (message_size > chunksize_max_unsigned)) { { // LOCK: chunking epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** MDEBUG("do_send() will SPLIT into small chunks, from packet="<<message_size<<" B for ptr="<<message_data); // 01234567890 // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 // ^^^ (pos=8, len=4) ; // const size_t bufsize = chunksize_good; // TODO safecast // char* buf = new char[ bufsize ]; bool all_ok = true; while (!message.empty()) { byte_slice chunk = message.take_slice(chunksize_good); MDEBUG("chunk_start="<<(void*)chunk.data()<<" ptr="<<message_data<<" pos="<<(chunk.data() - message_data)); MDEBUG("part of " << message.size() << ": pos="<<(chunk.data() - message_data) << " len="<<chunk.size()); bool ok = do_send_chunk(std::move(chunk)); // <====== *** all_ok = all_ok && ok; if (!all_ok) { MDEBUG("do_send() DONE ***FAILED*** from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() SEND was aborted in middle of big package - this is mostly harmless " << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << message_size); return false; // partial failure in sending } // (in catch block, or uniq pointer) delete buf; } // each chunk MDEBUG("do_send() DONE SPLIT from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() m_connection_type = " << m_connection_type); return all_ok; // done - e.g. queued - all the chunks of current do_send call } // LOCK: chunking } // a big block (to be chunked) - all chunks else { // small block return do_send_chunk(std::move(message)); // just send as 1 big chunk } CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); } // do_send() //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::do_send_chunk(byte_slice chunk) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) return false; if(m_was_shutdown) return false; double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; context.m_max_speed_up = std::max(context.m_max_speed_up, current_speed_up); //_info("[sock " << socket().native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); context.m_send_cnt += chunk.size(); //some data should be wrote to stream //request complete // No sleeping here; sleeping is done once and for all in "handle_write" m_send_que_lock.lock(); // *** critical *** epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){m_send_que_lock.unlock();}); long int retry=0; const long int retry_limit = 5*4; while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) { retry++; /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop _fact("ABORT queue wait due to stopping"); return false; // aborted }*/ using engine = std::mt19937; engine rng; std::random_device dev; std::seed_seq::result_type rand[engine::state_size]{}; // Use complete bit space std::generate_n(rand, engine::state_size, std::ref(dev)); std::seed_seq seed(rand, rand + engine::state_size); rng.seed(seed); long int ms = 250 + (rng() % 50); MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<chunk.size()); // XXX debug sleep m_send_que_lock.unlock(); boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); m_send_que_lock.lock(); _dbg1("sleep for queue: " << ms); if (retry > retry_limit) { MWARNING("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); shutdown(); return false; } } m_send_que.push_back(std::move(chunk)); if(m_send_que.size() > 1) { // active operation should be in progress, nothing to do, just wait last operation callback auto size_now = m_send_que.back().size(); MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function LOG_TRACE_CC(context, "[sock " << socket().native_handle() << "] Async send requested " << m_send_que.front().size()); } else { // no active operation if(m_send_que.size()!=1) { _erro("Looks like no active operations, but send que size != 1!!"); return false; } auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B"); if (speed_limit_is_enabled()) do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); reset_timer(get_default_timeout(), false); async_write(boost::asio::buffer(m_send_que.front().data(), size_now ) , strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) ) ); //_dbg3("(chunk): " << size_now); //logger_handle_net_write(size_now); //_info("[sock " << socket().native_handle() << "] Async send requested " << m_send_que.front().size()); } //do_send_handler_stop( ptr , cb ); // empty function return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send_chunk", false); } // do_send_chunk //--------------------------------------------------------------------------------- template<class t_protocol_handler> boost::posix_time::milliseconds connection<t_protocol_handler>::get_default_timeout() { unsigned count; try { count = host_count(m_host); } catch (...) { count = 0; } const unsigned shift = get_state().sock_count > AGGRESSIVE_TIMEOUT_THRESHOLD ? std::min(std::max(count, 1u) - 1, 8u) : 0; boost::posix_time::milliseconds timeout(0); if (m_local) timeout = boost::posix_time::milliseconds(DEFAULT_TIMEOUT_MS_LOCAL >> shift); else timeout = boost::posix_time::milliseconds(DEFAULT_TIMEOUT_MS_REMOTE >> shift); return timeout; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> boost::posix_time::milliseconds connection<t_protocol_handler>::get_timeout_from_bytes_read(size_t bytes) { boost::posix_time::milliseconds ms = (boost::posix_time::milliseconds)(unsigned)(bytes * TIMEOUT_EXTRA_MS_PER_BYTE); ms += m_timer.expires_from_now(); if (ms > get_default_timeout()) ms = get_default_timeout(); return ms; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> unsigned int connection<t_protocol_handler>::host_count(const std::string &host, int delta) { static boost::mutex hosts_mutex; CRITICAL_REGION_LOCAL(hosts_mutex); static std::map<std::string, unsigned int> hosts; unsigned int &val = hosts[host]; if (delta > 0) MTRACE("New connection from host " << host << ": " << val); else if (delta < 0) MTRACE("Closed connection from host " << host << ": " << val); CHECK_AND_ASSERT_THROW_MES(delta >= 0 || val >= (unsigned)-delta, "Count would go negative"); CHECK_AND_ASSERT_THROW_MES(delta <= 0 || val <= std::numeric_limits<unsigned int>::max() - (unsigned)delta, "Count would wrap"); val += delta; return val; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::reset_timer(boost::posix_time::milliseconds ms, bool add) { MTRACE("Setting " << ms << " expiry"); auto self = safe_shared_from_this(); if(!self) { MERROR("Resetting timer on a dead object"); return; } if (m_was_shutdown) { MERROR("Setting timer on a shut down object"); return; } if (add) ms += m_timer.expires_from_now(); m_timer.expires_from_now(ms); m_timer.async_wait([=](const boost::system::error_code& ec) { if(ec == boost::asio::error::operation_aborted) return; MDEBUG(context << "connection timeout, closing"); self->close(); }); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::shutdown() { CRITICAL_REGION_BEGIN(m_shutdown_lock); if (m_was_shutdown) return true; m_was_shutdown = true; // Initiate graceful connection closure. m_timer.cancel(); boost::system::error_code ignored_ec; if (m_ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled) { const shared_state &state = static_cast<const shared_state&>(get_state()); if (!state.stop_signal_sent) socket_.shutdown(ignored_ec); } socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); if (!m_host.empty()) { try { host_count(m_host, -1); } catch (...) { /* ignore */ } m_host = ""; } CRITICAL_REGION_END(); m_protocol_handler.release_protocol(); return true; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::close() { TRY_ENTRY(); auto self = safe_shared_from_this(); if(!self) return false; //_info("[sock " << socket().native_handle() << "] Que Shutdown called."); m_timer.cancel(); size_t send_que_size = 0; CRITICAL_REGION_BEGIN(m_send_que_lock); send_que_size = m_send_que.size(); CRITICAL_REGION_END(); boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); if(!send_que_size) { shutdown(); } return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::close", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::send_done() { if (m_ready_to_close) return close(); m_ready_to_close = true; return true; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::cancel() { return close(); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::handle_write(const boost::system::error_code& e, size_t cb) { TRY_ENTRY(); LOG_TRACE_CC(context, "[sock " << socket().native_handle() << "] Async send calledback " << cb); if (e) { _dbg1("[sock " << socket().native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); shutdown(); return; } logger_handle_net_write(cb); // The single sleeping that is needed for correctly handling "out" speed throttling if (speed_limit_is_enabled()) { sleep_before_packet(cb, 1, 1); } bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) { _erro("[sock " << socket().native_handle() << "] m_send_que.size() == 0 at handle_write!"); return; } m_send_que.pop_front(); if(m_send_que.empty()) { if(boost::interprocess::ipcdetail::atomic_read32(&m_want_close_connection)) { do_shutdown = true; } }else { //have more data to send reset_timer(get_default_timeout(), false); auto size_now = m_send_que.front().size(); MDEBUG("handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); if (speed_limit_is_enabled()) do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), void(), "Unexpected queue size"); async_write(boost::asio::buffer(m_send_que.front().data(), size_now) , strand_.wrap( boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2) ) ); //_dbg3("(normal)" << size_now); } CRITICAL_REGION_END(); if(do_shutdown) { shutdown(); } CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void connection<t_protocol_handler>::setRpcStation() { m_connection_type = e_connection_type_RPC; MDEBUG("set m_connection_type = RPC "); } template<class t_protocol_handler> bool connection<t_protocol_handler>::speed_limit_is_enabled() const { return m_connection_type != e_connection_type_RPC ; } /************************************************************************/ /* */ /************************************************************************/ template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), default_remote(), m_stop_signal_sent(false), m_port(0), m_threads_count(0), m_thread_index(0), m_connection_type( connection_type ), new_connection_(), new_connection_ipv6() { create_server_type_map(); m_thread_name_prefix = "NET"; } template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), default_remote(), m_stop_signal_sent(false), m_port(0), m_threads_count(0), m_thread_index(0), m_connection_type(connection_type), new_connection_(), new_connection_ipv6() { create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::~boosted_tcp_server() { this->send_stop_signal(); timed_wait_server_stop(10000); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::create_server_type_map() { server_type_map["NET"] = e_connection_type_NET; server_type_map["RPC"] = e_connection_type_RPC; server_type_map["P2P"] = e_connection_type_P2P; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string& address, uint32_t port_ipv6, const std::string& address_ipv6, bool use_ipv6, bool require_ipv4, ssl_options_t ssl_options) { TRY_ENTRY(); m_stop_signal_sent = false; m_port = port; m_port_ipv6 = port_ipv6; m_address = address; m_address_ipv6 = address_ipv6; m_use_ipv6 = use_ipv6; m_require_ipv4 = require_ipv4; if (ssl_options) m_state->configure_ssl(std::move(ssl_options)); std::string ipv4_failed = ""; std::string ipv6_failed = ""; try { boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast<std::string>(port), boost::asio::ip::tcp::resolver::query::canonical_name); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); acceptor_.open(endpoint.protocol()); #if !defined(_WIN32) acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); #endif acceptor_.bind(endpoint); acceptor_.listen(); boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); m_port = binded_endpoint.port(); MDEBUG("start accept (IPv4)"); new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, m_state->ssl_options().support)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4, this, boost::asio::placeholders::error)); } catch (const std::exception &e) { ipv4_failed = e.what(); } if (ipv4_failed != "") { MERROR("Failed to bind IPv4: " << ipv4_failed); if (require_ipv4) { throw std::runtime_error("Failed to bind IPv4 (set to required)"); } } if (use_ipv6) { try { if (port_ipv6 == 0) port_ipv6 = port; // default arg means bind to same port as ipv4 boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(address_ipv6, boost::lexical_cast<std::string>(port_ipv6), boost::asio::ip::tcp::resolver::query::canonical_name); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); acceptor_ipv6.open(endpoint.protocol()); #if !defined(_WIN32) acceptor_ipv6.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); #endif acceptor_ipv6.set_option(boost::asio::ip::v6_only(true)); acceptor_ipv6.bind(endpoint); acceptor_ipv6.listen(); boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_ipv6.local_endpoint(); m_port_ipv6 = binded_endpoint.port(); MDEBUG("start accept (IPv6)"); new_connection_ipv6.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, m_state->ssl_options().support)); acceptor_ipv6.async_accept(new_connection_ipv6->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6, this, boost::asio::placeholders::error)); } catch (const std::exception &e) { ipv6_failed = e.what(); } } if (use_ipv6 && ipv6_failed != "") { MERROR("Failed to bind IPv6: " << ipv6_failed); if (ipv4_failed != "") { throw std::runtime_error("Failed to bind IPv4 and IPv6"); } } return true; } catch (const std::exception &e) { MFATAL("Error starting server: " << e.what()); return false; } catch (...) { MFATAL("Error starting server"); return false; } } //----------------------------------------------------------------------------- PUSH_WARNINGS DISABLE_GCC_WARNING(maybe-uninitialized) template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address, const std::string port_ipv6, const std::string address_ipv6, bool use_ipv6, bool require_ipv4, ssl_options_t ssl_options) { uint32_t p = 0; uint32_t p_ipv6 = 0; if (port.size() && !string_tools::get_xtype_from_string(p, port)) { MERROR("Failed to convert port no = " << port); return false; } if (port_ipv6.size() && !string_tools::get_xtype_from_string(p_ipv6, port_ipv6)) { MERROR("Failed to convert port no = " << port_ipv6); return false; } return this->init_server(p, address, p_ipv6, address_ipv6, use_ipv6, require_ipv4, std::move(ssl_options)); } POP_WARNINGS //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::worker_thread() { TRY_ENTRY(); uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index); std::string thread_name = std::string("[") + m_thread_name_prefix; thread_name += boost::to_string(local_thr_index) + "]"; MLOG_SET_THREAD_NAME(thread_name); // _fact("Thread name: " << m_thread_name_prefix); while(!m_stop_signal_sent) { try { io_service_.run(); return true; } catch(const std::exception& ex) { _erro("Exception at server worker thread, what=" << ex.what()); } catch(...) { _erro("Exception at server worker thread, unknown execption"); } } //_info("Worker thread finished"); return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; auto it = server_type_map.find(m_thread_name_prefix); if (it==server_type_map.end()) throw std::runtime_error("Unknown prefix/server type:" + std::string(prefix_name)); auto connection_type = it->second; // the value of type MINFO("Set server type to: " << connection_type << " from name: " << m_thread_name_prefix << ", prefix_name = " << prefix_name); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::set_connection_filter(i_connection_filter* pfilter) { assert(m_state != nullptr); // always set in constructor m_state->pfilter = pfilter; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::run_server(size_t threads_count, bool wait, const boost::thread::attributes& attrs) { TRY_ENTRY(); m_threads_count = threads_count; m_main_thread_id = boost::this_thread::get_id(); MLOG_SET_THREAD_NAME("[SRV_MAIN]"); while(!m_stop_signal_sent) { // Create a pool of threads to run all of the io_services. CRITICAL_REGION_BEGIN(m_threads_lock); for (std::size_t i = 0; i < threads_count; ++i) { boost::shared_ptr<boost::thread> thread(new boost::thread( attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this))); _note("Run server thread name: " << m_thread_name_prefix); m_threads.push_back(thread); } CRITICAL_REGION_END(); // Wait for all threads in the pool to exit. if (wait) { _fact("JOINING all threads"); for (std::size_t i = 0; i < m_threads.size(); ++i) { m_threads[i]->join(); } _fact("JOINING all threads - almost"); m_threads.clear(); _fact("JOINING all threads - DONE"); } else { _dbg1("Reiniting OK."); return true; } if(wait && !m_stop_signal_sent) { //some problems with the listening socket ?.. _dbg1("Net service stopped without stop request, restarting..."); if(!this->init_server(m_port, m_address, m_port_ipv6, m_address_ipv6, m_use_ipv6, m_require_ipv4)) { _dbg1("Reiniting service failed, exit."); return false; }else { _dbg1("Reiniting OK."); } } } return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::run_server", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::is_thread_worker() { TRY_ENTRY(); CRITICAL_REGION_LOCAL(m_threads_lock); BOOST_FOREACH(boost::shared_ptr<boost::thread>& thp, m_threads) { if(thp->get_id() == boost::this_thread::get_id()) return true; } if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id) return true; return false; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::is_thread_worker", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop(uint64_t wait_mseconds) { TRY_ENTRY(); boost::chrono::milliseconds ms(wait_mseconds); for (std::size_t i = 0; i < m_threads.size(); ++i) { if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { _dbg1("Interrupting thread " << m_threads[i]->native_handle()); m_threads[i]->interrupt(); } } return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::send_stop_signal() { m_stop_signal_sent = true; typename connection<t_protocol_handler>::shared_state *state = static_cast<typename connection<t_protocol_handler>::shared_state*>(m_state.get()); state->stop_signal_sent = true; TRY_ENTRY(); connections_mutex.lock(); for (auto &c: connections_) { c->cancel(); } connections_.clear(); connections_mutex.unlock(); io_service_.stop(); CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void()); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4(const boost::system::error_code& e) { this->handle_accept(e, false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6(const boost::system::error_code& e) { this->handle_accept(e, true); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e, bool ipv6) { MDEBUG("handle_accept"); boost::asio::ip::tcp::acceptor* current_acceptor = &acceptor_; connection_ptr* current_new_connection = &new_connection_; auto accept_function_pointer = &boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4; if (ipv6) { current_acceptor = &acceptor_ipv6; current_new_connection = &new_connection_ipv6; accept_function_pointer = &boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6; } try { if (!e) { if (m_connection_type == e_connection_type_RPC) { const char *ssl_message = "unknown"; switch ((*current_new_connection)->get_ssl_support()) { case epee::net_utils::ssl_support_t::e_ssl_support_disabled: ssl_message = "disabled"; break; case epee::net_utils::ssl_support_t::e_ssl_support_enabled: ssl_message = "enabled"; break; case epee::net_utils::ssl_support_t::e_ssl_support_autodetect: ssl_message = "autodetection"; break; } MDEBUG("New server for RPC connections, SSL " << ssl_message); (*current_new_connection)->setRpcStation(); // hopefully this is not needed actually } connection_ptr conn(std::move((*current_new_connection))); (*current_new_connection).reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, conn->get_ssl_support())); current_acceptor->async_accept((*current_new_connection)->socket(), boost::bind(accept_function_pointer, this, boost::asio::placeholders::error)); boost::asio::socket_base::keep_alive opt(true); conn->socket().set_option(opt); bool res; if (default_remote.get_type_id() == net_utils::address_type::invalid) res = conn->start(true, 1 < m_threads_count); else res = conn->start(true, 1 < m_threads_count, default_remote); if (!res) { conn->cancel(); return; } conn->save_dbg_log(); return; } else { MERROR("Error in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e); } } catch (const std::exception &e) { MERROR("Exception in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e.what()); } // error path, if e or exception assert(m_state != nullptr); // always set in constructor _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count); misc_utils::sleep_no_w(100); (*current_new_connection).reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support())); current_acceptor->async_accept((*current_new_connection)->socket(), boost::bind(accept_function_pointer, this, boost::asio::placeholders::error)); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::add_connection(t_connection_context& out, boost::asio::ip::tcp::socket&& sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support) { if(std::addressof(get_io_service()) == std::addressof(GET_IO_SERVICE(sock))) { connection_ptr conn(new connection<t_protocol_handler>(std::move(sock), m_state, m_connection_type, ssl_support)); if(conn->start(false, 1 < m_threads_count, std::move(real_remote))) { conn->get_context(out); conn->save_dbg_log(); return true; } } else { MWARNING(out << " was not added, socket/io_service mismatch"); } return false; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> typename boosted_tcp_server<t_protocol_handler>::try_connect_result_t boosted_tcp_server<t_protocol_handler>::try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); sock_.open(remote_endpoint.protocol()); if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) { boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0); boost::system::error_code ec; sock_.bind(local_endpoint, ec); if (ec) { MERROR("Error binding to " << bind_ip << ": " << ec.message()); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } } /* NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service. */ boost::system::error_code ec = boost::asio::error::would_block; //have another free thread(s), work in wait mode, without event handling struct local_async_context { boost::system::error_code ec; boost::mutex connect_mut; boost::condition_variable cond; }; boost::shared_ptr<local_async_context> local_shared_context(new local_async_context()); local_shared_context->ec = boost::asio::error::would_block; boost::unique_lock<boost::mutex> lock(local_shared_context->connect_mut); auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr<local_async_context> shared_context) { shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->cond.notify_one(); shared_context->connect_mut.unlock(); }; sock_.async_connect(remote_endpoint, boost::bind<void>(connect_callback, _1, local_shared_context)); while(local_shared_context->ec == boost::asio::error::would_block) { bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout)); if (m_stop_signal_sent) { if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } if(local_shared_context->ec == boost::asio::error::would_block && !r) { //timeout sock_.close(); _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return CONNECT_FAILURE; } } ec = local_shared_context->ec; if (ec || !sock_.is_open()) { _dbg3("Some problems at connect, message: " << ec.message()); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } _dbg3("Connected success to " << adr << ':' << port); const ssl_support_t ssl_support = new_connection_l->get_ssl_support(); if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { // Handshake MDEBUG("Handshaking SSL..."); if (!new_connection_l->handshake(boost::asio::ssl::stream_base::client)) { if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); sock_.close(); return CONNECT_NO_SSL; } MERROR("SSL handshake failed"); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } } return CONNECT_SUCCESS; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::try_connect", CONNECT_FAILURE); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, ssl_support) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); connections_mutex.unlock(); epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); }); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); bool try_ipv6 = false; boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); boost::system::error_code resolve_error; boost::asio::ip::tcp::resolver::iterator iterator; try { //resolving ipv4 address as ipv6 throws, catch here and move on iterator = resolver.resolve(query, resolve_error); } catch (const boost::system::system_error& e) { if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found && resolve_error != boost::asio::error::host_not_found_try_again)) { throw; } try_ipv6 = true; } catch (...) { throw; } std::string bind_ip_to_use; boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { if (!m_use_ipv6) { _erro("Failed to resolve " << adr); return false; } else { try_ipv6 = true; MINFO("Resolving address as IPv4 failed, trying IPv6"); } } else { bind_ip_to_use = bind_ip; } if (try_ipv6) { boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); iterator = resolver.resolve(query6, resolve_error); if(iterator == end) { _erro("Failed to resolve " << adr); return false; } else { if (bind_ip == "0.0.0.0") { bind_ip_to_use = "::"; } else { bind_ip_to_use = ""; } } } MDEBUG("Trying to connect to " << adr << ":" << port << ", bind_ip = " << bind_ip_to_use); //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port); boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); auto try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, ssl_support); if (try_connect_result == CONNECT_FAILURE) return false; if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect && try_connect_result == CONNECT_NO_SSL) { // we connected, but could not connect with SSL, try without MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); new_connection_l->disable_ssl(); try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, epee::net_utils::ssl_support_t::e_ssl_support_disabled); if (try_connect_result != CONNECT_SUCCESS) return false; } // start adds the connection to the config object's list, so we don't need to have it locally anymore connections_mutex.lock(); connections_.erase(new_connection_l); connections_mutex.unlock(); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter)); } else { assert(m_state != nullptr); // always set in constructor _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count); } new_connection_l->save_dbg_log(); return r; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect", false); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> template<class t_callback> bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, ssl_support) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); connections_mutex.unlock(); epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); }); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); bool try_ipv6 = false; boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); boost::system::error_code resolve_error; boost::asio::ip::tcp::resolver::iterator iterator; try { //resolving ipv4 address as ipv6 throws, catch here and move on iterator = resolver.resolve(query, resolve_error); } catch (const boost::system::system_error& e) { if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found && resolve_error != boost::asio::error::host_not_found_try_again)) { throw; } try_ipv6 = true; } catch (...) { throw; } boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { if (!try_ipv6) { _erro("Failed to resolve " << adr); return false; } else { MINFO("Resolving address as IPv4 failed, trying IPv6"); } } if (try_ipv6) { boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); iterator = resolver.resolve(query6, resolve_error); if(iterator == end) { _erro("Failed to resolve " << adr); return false; } } boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); sock_.open(remote_endpoint.protocol()); if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) { boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0); boost::system::error_code ec; sock_.bind(local_endpoint, ec); if (ec) { MERROR("Error binding to " << bind_ip << ": " << ec.message()); if (sock_.is_open()) sock_.close(); return false; } } boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_service_)); //start deadline sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout)); sh_deadline->async_wait([=](const boost::system::error_code& error) { if(error != boost::asio::error::operation_aborted) { _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); new_connection_l->socket().close(); } }); //start async connect sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_) { t_connection_context conn_context = AUTO_VAL_INIT(conn_context); boost::system::error_code ignored_ec; boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec); if(!ec_) {//success if(!sh_deadline->cancel()) { cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation }else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); // start adds the connection to the config object's list, so we don't need to have it locally anymore connections_mutex.lock(); connections_.erase(new_connection_l); connections_mutex.unlock(); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); cb(conn_context, ec_); } else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); cb(conn_context, boost::asio::error::fault); } } }else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); cb(conn_context, ec_); } }); return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false); } } // namespace } // namespace PRAGMA_WARNING_POP