// Copyright (c) 2014-2018, The Monero Project
// 
// All rights reserved.
// 
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
// 
// 1. Redistributions of source code must retain the above copyright notice, this list of
//    conditions and the following disclaimer.
// 
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
//    of conditions and the following disclaimer in the documentation and/or other
//    materials provided with the distribution.
// 
// 3. Neither the name of the copyright holder nor the names of its contributors may be
//    used to endorse or promote products derived from this software without specific
//    prior written permission.
// 
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// 
// Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers

#include <atomic>
#include <chrono>
#include <functional>
#include <numeric>
#include <boost/thread/thread.hpp>
#include <vector>

#include "gtest/gtest.h"

#include "include_base_utils.h"
#include "misc_language.h"
#include "misc_log_ex.h"
#include "storages/levin_abstract_invoke2.h"

#include "net_load_tests.h"

using namespace net_load_tests;

namespace
{
  const size_t CONNECTION_COUNT = 100000;
  const size_t CONNECTION_TIMEOUT = 10000;
  const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
  const size_t RESERVED_CONN_CNT = 1;

  template<typename t_predicate>
  bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
  {
    for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
    {
      if (predicate())
        return true;
      //std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
      epee::misc_utils::sleep_no_w(static_cast<long>(sleep_ms));
    }
    return false;
  }

  class t_connection_opener_1
  {
  public:
    t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
      : m_tcp_server(tcp_server)
      , m_open_request_target(open_request_target)
      , m_next_id(0)
      , m_error_count(0)
      , m_connections(open_request_target)
    {
      for (auto& conn_id : m_connections)
        conn_id = boost::uuids::nil_uuid();
    }

    bool open()
    {
      size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
      if (m_open_request_target <= id)
        return false;

      bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
        if (!ec)
        {
          m_connections[id] = context.m_connection_id;
        }
        else
        {
          m_error_count.fetch_add(1, std::memory_order_relaxed);
        }
      });

      if (!r)
      {
        m_error_count.fetch_add(1, std::memory_order_relaxed);
      }

      return true;
    }

    bool close(size_t id)
    {
      if (!m_connections[id].is_nil())
      {
        m_tcp_server.get_config_object().close(m_connections[id]);
        return true;
      }
      else
      {
        return false;
      }
    }

    size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }

  private:
    test_tcp_server& m_tcp_server;
    size_t m_open_request_target;
    std::atomic<size_t> m_next_id;
    std::atomic<size_t> m_error_count;
    std::vector<boost::uuids::uuid> m_connections;
  };

  class t_connection_opener_2
  {
  public:
    t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
      : m_tcp_server(tcp_server)
      , m_open_request_target(open_request_target)
      , m_open_request_count(0)
      , m_error_count(0)
      , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
    {
    }

    bool open_and_close()
    {
      size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
      if (m_open_request_target <= req_count)
        return false;

      bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
        if (!ec)
        {
          m_open_close_test_helper.handle_new_connection(context.m_connection_id);
        }
        else
        {
          m_error_count.fetch_add(1, std::memory_order_relaxed);
        }
      });

      if (!r)
      {
        m_error_count.fetch_add(1, std::memory_order_relaxed);
      }

      return true;
    }

    void close_remaining_connections()
    {
      m_open_close_test_helper.close_remaining_connections();
    }

    size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
    size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }

  private:
    test_tcp_server& m_tcp_server;
    size_t m_open_request_target;
    std::atomic<size_t> m_open_request_count;
    std::atomic<size_t> m_error_count;
    open_close_test_helper m_open_close_test_helper;
  };

  class net_load_test_clt : public ::testing::Test
  {
  public:
    net_load_test_clt()
    : m_tcp_server(epee::net_utils::e_connection_type_RPC) // RPC disables network limit for unit tests
    {
	}
  protected:
    virtual void SetUp()
    {
      m_thread_count = (std::max)(min_thread_count, boost::thread::hardware_concurrency() / 2);

      m_tcp_server.get_config_object().set_handler(&m_commands_handler);
      m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;

      ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
      ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));

      // Connect to server
      std::atomic<int> conn_status(0);
      m_cmd_conn_id = boost::uuids::nil_uuid();
      ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
        if (!ec)
        {
          m_cmd_conn_id = context.m_connection_id;
        }
        else
        {
          LOG_ERROR("Connection error: " << ec.message());
        }
        conn_status.store(1, std::memory_order_seq_cst);
      }));

      EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
      ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
      ASSERT_FALSE(m_cmd_conn_id.is_nil());

      conn_status.store(0, std::memory_order_seq_cst);
      CMD_RESET_STATISTICS::request req;
      ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
        m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
          conn_status.store(code, std::memory_order_seq_cst);
      }));

      EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
      ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
    }

    virtual void TearDown()
    {
      m_tcp_server.send_stop_signal();
      ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
    }

    static void TearDownTestCase()
    {
      // Stop server
      test_levin_commands_handler *commands_handler_ptr = new test_levin_commands_handler();
      test_levin_commands_handler &commands_handler = *commands_handler_ptr;
      test_tcp_server tcp_server(epee::net_utils::e_connection_type_RPC);
      tcp_server.get_config_object().set_handler(commands_handler_ptr, [](epee::levin::levin_commands_handler<test_connection_context> *handler)->void { delete handler; });
      tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;

      if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
      if (!tcp_server.run_server(2, false)) return;

      // Connect to server and invoke shutdown command
      std::atomic<int> conn_status(0);
      boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
      tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
        cmd_conn_id = context.m_connection_id;
        conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
      });

      if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
      if (1 != conn_status.load(std::memory_order_seq_cst)) return;

      epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());

      busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
    }

    template<typename Func>
    static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
    {
      func();
    }

    template<typename Func>
    static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
    {
      func(thread_index);
    }

    template<typename Func>
    void parallel_exec(const Func& func)
    {
      unit_test::call_counter properly_finished_threads;
      std::vector<boost::thread> threads(m_thread_count);
      for (size_t i = 0; i < threads.size(); ++i)
      {
        threads[i] = boost::thread([&, i] {
          call_func(i, func, 0);
          properly_finished_threads.inc();
        });
      }

      for (auto& th : threads)
        th.join();

      ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
    }

    void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
    {
      std::atomic<int> req_status(0);
      CMD_GET_STATISTICS::request req;
      ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
        m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
          if (0 < code)
          {
            statistics = rsp;
          }
          else
          {
            LOG_ERROR("Get server statistics error: " << code);
          }
          req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
      }));

      EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
      ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
    }

    template <typename t_predicate>
    bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
    {
      for (size_t i = 0; i < 30; ++i)
      {
        get_server_statistics(statistics);
        if (predicate(statistics))
        {
          return true;
        }

        //std::this_thread::sleep_for(std::chrono::seconds(1));
        epee::misc_utils::sleep_no_w(1000);
      }

      return false;
    }

    void ask_for_data_requests(size_t request_size = 0)
    {
      CMD_SEND_DATA_REQUESTS::request req;
      req.request_size = request_size;
      epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
    }

  protected:
    test_tcp_server m_tcp_server;
    test_levin_commands_handler m_commands_handler;
    size_t m_thread_count;
    boost::uuids::uuid m_cmd_conn_id;
  };
}

TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
{
  // Open connections
  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
  parallel_exec([&] {
    while (connection_opener.open());
  });

  // Wait for all open requests to complete
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
    " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");

  // Check
  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());

  // Close connections
  parallel_exec([&](size_t thread_idx) {
    for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
    {
      connection_opener.close(i);
    }
  });

  // Wait for all opened connections to close
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
    " / " << m_commands_handler.close_connection_counter());

  // Check all connections are closed
  ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());

  // Wait for server to handle all open and close requests
  CMD_GET_STATISTICS::response srv_stat;
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status
  // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
  ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);

  // Request data from server, it causes to close rest connections
  ask_for_data_requests();

  // Wait for server to close rest connections
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status. All connections should be closed
  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}

TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
{
  // Open connections
  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
  parallel_exec([&] {
    while (connection_opener.open());
  });

  // Wait for all open requests to complete
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
    " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");

  // Check
  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());

  // Wait for server accepts all connections
  CMD_GET_STATISTICS::response srv_stat;
  int last_new_connection_counter = -1;
  busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
    if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
    else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
  });

  // Close connections
  CMD_CLOSE_ALL_CONNECTIONS::request req;
  ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));

  // Wait for all opened connections to close
  busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
    " / " << m_commands_handler.close_connection_counter());

  // It's OK, if server didn't close all connections, because it could accept not all our connections
  ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
  ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());

  // Wait for server to handle all open and close requests
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status
  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);

  // Close rest connections
  m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
    if (ctx.m_connection_id != m_cmd_conn_id)
    {
      CMD_DATA_REQUEST::request req;
      bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
        m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
          if (code <= 0)
          {
            LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
          }
      });
      if (!r)
        LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
    }
    return true;
  });

  // Wait for all opened connections to close
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
    " / " << m_commands_handler.close_connection_counter());

  // Check
  ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}

TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
{
  static const size_t MAX_OPENED_CONN_COUNT = 100;

  // Open/close connections
  t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
  parallel_exec([&] {
    while (connection_opener.open_and_close());
  });

  // Wait for all open requests to complete
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
    " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");

  // Check
  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);

  // Wait for all close requests to complete
  EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
  LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());

  // Check
  ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());

  connection_opener.close_remaining_connections();

  // Wait for all close requests to complete
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
  LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());

  ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
  ASSERT_EQ(0, connection_opener.opened_connection_count());
  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());

  // Wait for server to handle all open and close requests
  CMD_GET_STATISTICS::response srv_stat;
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status
  // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
  ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);

  // Request data from server, it causes to close rest connections
  ask_for_data_requests();

  // Wait for server to close rest connections
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status. All connections should be closed
  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}

TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
{
  static const size_t MAX_OPENED_CONN_COUNT = 100;

  // Init test
  std::atomic<int> test_state(0);
  CMD_START_OPEN_CLOSE_TEST::request req_start;
  req_start.open_request_target = CONNECTION_COUNT;
  req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
  ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
    m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
      test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
  }));

  // Wait for server response
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
  ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));

  // Open connections
  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
  parallel_exec([&] {
    while (connection_opener.open());
  });

  // Wait for all open requests to complete
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
    " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
  LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());

  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);

  // Wait for server accepts all connections
  CMD_GET_STATISTICS::response srv_stat;
  int last_new_connection_counter = -1;
  busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
    if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
    else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
  });

  // Ask server to close rest connections
  CMD_CLOSE_ALL_CONNECTIONS::request req;
  ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));

  // Wait for almost all connections to be closed by server
  busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });

  // It's OK, if there are opened connections, because server could accept not all our connections
  ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
  ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());

  // Wait for server to handle all open and close requests
  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());

  // Check server status
  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);

  // Close rest connections
  m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
    if (ctx.m_connection_id != m_cmd_conn_id)
    {
      CMD_DATA_REQUEST::request req;
      bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
        m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
          if (code <= 0)
          {
            LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
          }
      });
      if (!r)
        LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
    }
    return true;
  });

  // Wait for all opened connections to close
  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
    " / " << m_commands_handler.close_connection_counter());

  // Check
  ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}

int main(int argc, char** argv)
{
  tools::on_startup();
  epee::debug::get_set_enable_assert(true, false);
  //set up logging options
  mlog_configure(mlog_get_default_log_path("net_load_tests_clt.log"), true);

  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}