diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index f2aabfc..c8dd299 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -424,9 +424,10 @@ jobs: strategy: matrix: config: - - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON"} - - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "ON"} - - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "OFF"} + - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON", grpc: "ON"} + - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON", grpc: "OFF"} + - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "ON", grpc: "OFF"} + - {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "OFF", grpc: "OFF"} steps: - name: Checkout repository @@ -441,7 +442,7 @@ jobs: run: | mkdir build cd build - cmake .. -G "${{ matrix.config.vs }}" -DCMAKE_SYSTEM_VERSION="10.0" -DWITH_RANDOMX=${{ matrix.config.rx }} -DWITH_UPNP=${{ matrix.config.upnp }} + cmake .. -G "${{ matrix.config.vs }}" -DCMAKE_SYSTEM_VERSION="10.0" -DWITH_RANDOMX=${{ matrix.config.rx }} -DWITH_UPNP=${{ matrix.config.upnp }} -DWITH_GRPC=${{ matrix.config.grpc }} & "${{ matrix.config.vspath }}\\MSBuild\\Current\\Bin\\amd64\\msbuild" -v:m /m /p:Configuration=Release p2pool.vcxproj - name: Check Windows 7 compatibility @@ -477,7 +478,7 @@ jobs: - name: Archive binary uses: actions/upload-artifact@v4 with: - name: p2pool-vs-${{ matrix.config.os }}-randomx-${{ matrix.config.rx }}-upnp-${{ matrix.config.upnp }}.exe + name: p2pool-vs-${{ matrix.config.os }}-randomx-${{ matrix.config.rx }}-upnp-${{ matrix.config.upnp }}-grpc-${{ matrix.config.grpc }}.exe path: build/Release/p2pool.exe build-macos: diff --git a/CMakeLists.txt b/CMakeLists.txt index bc2ddb6..8d0b4af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ option(STATIC_LIBS "Link libuv and libzmq statically" OFF) option(WITH_RANDOMX "Include the RandomX library in the build. If this is turned off, p2pool will rely on monerod for verifying RandomX hashes" ON) option(WITH_LTO "Use link-time compiler optimization (if linking fails for you, run cmake with -DWITH_LTO=OFF)" ON) option(WITH_UPNP "Include UPnP support. If this is turned off, p2pool will not be able to configure port forwarding on UPnP-enabled routers." ON) +option(WITH_GRPC "Include gRPC support. If this is turned off, p2pool will not be able to merge mine with Tari." ON) option(DEV_TEST_SYNC "[Developer only] Sync test, stop p2pool after sync is complete" OFF) option(DEV_WITH_TSAN "[Developer only] Compile with thread sanitizer" OFF) @@ -31,10 +32,13 @@ if (CMAKE_CXX_COMPILER_ID MATCHES MSVC) set(CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION 10.0) endif() -include(cmake/grpc.cmake) +if (WITH_GRPC) + add_definitions(-DWITH_GRPC) + include(cmake/grpc.cmake) -add_subdirectory(external/src/Tari) -set(LIBS ${LIBS} Tari_gRPC) + add_subdirectory(external/src/Tari) + set(LIBS ${LIBS} Tari_gRPC) +endif() if (WITH_RANDOMX) add_definitions(-DWITH_RANDOMX) @@ -94,7 +98,6 @@ set(HEADERS src/mempool.h src/merge_mining_client.h src/merge_mining_client_json_rpc.h - src/merge_mining_client_tari.h src/merkle.h src/p2p_server.h src/p2pool.h @@ -129,7 +132,6 @@ set(SOURCES src/mempool.cpp src/merge_mining_client.cpp src/merge_mining_client_json_rpc.cpp - src/merge_mining_client_tari.cpp src/merkle.cpp src/p2p_server.cpp src/p2pool.cpp @@ -150,6 +152,11 @@ if (WITH_RANDOMX) set(SOURCES ${SOURCES} src/miner.cpp) endif() +if (WITH_GRPC) + set(HEADERS ${HEADERS} src/merge_mining_client_tari.h) + set(SOURCES ${SOURCES} src/merge_mining_client_tari.cpp) +endif() + source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Header Files" FILES ${HEADERS}) source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Source Files" FILES ${SOURCES}) @@ -379,7 +386,9 @@ if (STATIC_BINARY OR STATIC_LIBS) set(STATIC_LIBS ${STATIC_LIBS} resolv) endif() - set(STATIC_LIBS ${STATIC_LIBS} Tari_gRPC grpc grpc++ libprotobuf) + if (WITH_GRPC) + set(STATIC_LIBS ${STATIC_LIBS} Tari_gRPC grpc grpc++ libprotobuf) + endif() target_link_libraries(${CMAKE_PROJECT_NAME} "${CMAKE_SOURCE_DIR}/external/src/libzmq/build/lib/libzmq.a" diff --git a/external/src/Tari/CMakeLists.txt b/external/src/Tari/CMakeLists.txt index c39b69a..626322c 100644 --- a/external/src/Tari/CMakeLists.txt +++ b/external/src/Tari/CMakeLists.txt @@ -38,7 +38,7 @@ include_directories(../grpc/third_party/abseil-cpp) if (CMAKE_CXX_COMPILER_ID MATCHES MSVC) set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd") - set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd") set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} /W0 /O1 /Ob2 /Oi /Os /Oy /MP /GL /MT") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /W0 /O1 /Ob2 /Oi /Os /Oy /MP /GL /MT") else() diff --git a/src/merge_mining_client.cpp b/src/merge_mining_client.cpp index e100e47..828436c 100644 --- a/src/merge_mining_client.cpp +++ b/src/merge_mining_client.cpp @@ -19,7 +19,7 @@ #include "merge_mining_client.h" #include "merge_mining_client_json_rpc.h" -#ifndef P2POOL_UNIT_TESTS +#if defined(WITH_GRPC) && !defined(P2POOL_UNIT_TESTS) #include "merge_mining_client_tari.h" #endif @@ -28,7 +28,7 @@ namespace p2pool { IMergeMiningClient* IMergeMiningClient::create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept { try { -#ifndef P2POOL_UNIT_TESTS +#if defined(WITH_GRPC) && !defined(P2POOL_UNIT_TESTS) if (host.find(MergeMiningClientTari::TARI_PREFIX) == 0) { return new MergeMiningClientTari(pool, host, wallet); } diff --git a/src/merge_mining_client_tari.cpp b/src/merge_mining_client_tari.cpp index 280569c..bda0429 100644 --- a/src/merge_mining_client_tari.cpp +++ b/src/merge_mining_client_tari.cpp @@ -20,40 +20,64 @@ #include "merge_mining_client_tari.h" #include "p2pool.h" #include "params.h" -#include "Tari/proto.h" LOG_CATEGORY(MergeMiningClientTari) +using namespace tari::rpc; + namespace p2pool { -MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet) - : m_server(new gRPC_Server(pool->params().m_socks5Proxy)) - , m_host(host) - , m_port(0) +MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, const std::string& wallet) + : m_chainParams{} , m_auxWallet(wallet) , m_pool(pool) + , m_server(new TariServer(pool->params().m_socks5Proxy)) + , m_hostStr(host) { if (host.find(TARI_PREFIX) != 0) { LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found"); throw std::exception(); } - const size_t k = host.find_last_of(':'); - if (k != std::string::npos) { - m_host = host.substr(sizeof(TARI_PREFIX) - 1, k - (sizeof(TARI_PREFIX) - 1)); - m_port = std::stoul(host.substr(k + 1), nullptr, 10); + host.erase(0, sizeof(TARI_PREFIX) - 1); + + while (host.back() == '/') { + host.pop_back(); } - if (m_host.empty() || (m_port == 0) || (m_port >= 65536)) { + if (host.empty()) { + LOGERR(1, "Invalid host"); + throw std::exception(); + } + + m_server->parse_address_list(host, + [this](bool is_v6, const std::string& /*address*/, std::string ip, int port) + { + if (!m_pool->params().m_dns || resolve_host(ip, is_v6)) { + m_server->m_TariNodeIsV6 = is_v6; + m_server->m_TariNodeHost = ip; + m_server->m_TariNodePort = port; + } + }); + + if (m_server->m_TariNodeHost.empty() || (m_server->m_TariNodePort == 0) || (m_server->m_TariNodePort >= 65536)) { LOGERR(1, "Invalid host " << host); throw std::exception(); } uv_rwlock_init_checked(&m_lock); - if (!m_server->start(m_pool->params().m_dns, m_host, m_port)) { + if (!m_server->start()) { throw std::exception(); } + + char buf[32] = {}; + log::Stream s(buf); + s << "127.0.0.1:" << m_server->external_listen_port(); + + m_TariNode = new BaseNode::Stub(grpc::CreateChannel(buf, grpc::InsecureChannelCredentials())); + + merge_mining_get_chain_id(); } MergeMiningClientTari::~MergeMiningClientTari() @@ -61,6 +85,8 @@ MergeMiningClientTari::~MergeMiningClientTari() m_server->shutdown_tcp(); delete m_server; + delete m_TariNode; + LOGINFO(1, "stopped"); } @@ -82,68 +108,88 @@ void MergeMiningClientTari::submit_solution(const std::vector& blob, co (void)merkle_proof; } -MergeMiningClientTari::gRPC_Client::gRPC_Client() - : Client(m_buf, sizeof(m_buf)) +void MergeMiningClientTari::merge_mining_get_chain_id() { - m_buf[0] = '\0'; + struct Work + { + uv_work_t req; + MergeMiningClientTari* client; + }; + + Work* work = new Work{}; + work->req.data = work; + work->client = this; + + uv_queue_work(m_server->get_loop(), &work->req, + [](uv_work_t* req) + { + BACKGROUND_JOB_START(MergeMiningClientTari::merge_mining_get_chain_id); + + MergeMiningClientTari* client = reinterpret_cast(req->data)->client; + + grpc::Status status; + + NewBlockTemplateRequest request; + PowAlgo* algo = new PowAlgo(); + algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX); + request.clear_algo(); + request.set_allocated_algo(algo); + request.set_max_weight(1); + + grpc::ClientContext ctx; + NewBlockTemplateResponse response; + status = client->m_TariNode->GetNewBlockTemplate(&ctx, request, &response); + + grpc::ClientContext ctx2; + GetNewBlockResult response2; + status = client->m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2); + + const std::string& id = response2.tari_unique_id(); + LOGINFO(1, client->m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size())); + + if (id.size() == HASH_SIZE) { + WriteLock lock(client->m_lock); + std::copy(id.begin(), id.end(), client->m_chainParams.aux_id.h); + } + else { + LOGERR(1, "Tari unique_id has invalid size (" << id.size() << ')'); + } + }, + [](uv_work_t* req, int /*status*/) + { + delete reinterpret_cast(req->data); + BACKGROUND_JOB_STOP(MergeMiningClientTari::merge_mining_get_chain_id); + }); } -void MergeMiningClientTari::gRPC_Client::reset() +// TariServer and TariClient are simply a proxy from a localhost TCP port to the external Tari node +// This is needed for SOCKS5 proxy support (gRPC library doesn't support it natively) + +MergeMiningClientTari::TariServer::TariServer(const std::string& socks5Proxy) + : TCPServer(1, MergeMiningClientTari::TariClient::allocate, socks5Proxy) + , m_TariNodeIsV6(false) + , m_TariNodeHost() + , m_TariNodePort(0) + , m_internalPort(0) { - m_data.clear(); + m_callbackBuf.resize(MergeMiningClientTari::BUF_SIZE); } -bool MergeMiningClientTari::gRPC_Client::on_connect() +bool MergeMiningClientTari::TariServer::start() { - const MergeMiningClientTari::gRPC_Server* server = static_cast(m_owner); - if (server) { - LOGINFO(4, "Connected to " << server->m_host << ':' << server->m_port); + std::random_device rd; + + for (size_t i = 0; i < 10; ++i) { + if (start_listening(false, "127.0.0.1", 49152 + (rd() % 16384))) { + break; + } } - return true; -} - -bool MergeMiningClientTari::gRPC_Client::on_read(char* data, uint32_t size) -{ - const MergeMiningClientTari::gRPC_Server* server = static_cast(m_owner); - if (server) { - LOGINFO(4, "Read " << size << " bytes from " << server->m_host << ':' << server->m_port); - LOGINFO(4, log::hex_buf(data, size)); + if (m_listenPort < 0) { + LOGERR(1, "failed to listen on TCP port"); + return false; } - m_data.insert(m_data.end(), data, data + size); - - return true; -} - -void MergeMiningClientTari::gRPC_Client::on_read_failed(int err) -{ - const MergeMiningClientTari::gRPC_Server* server = static_cast(m_owner); - if (server) { - LOGERR(1, "Read from " << server->m_host << ':' << server->m_port << "failed, error " << err); - } -} - -void MergeMiningClientTari::gRPC_Client::on_disconnected() -{ - const MergeMiningClientTari::gRPC_Server* server = static_cast(m_owner); - if (server) { - LOGINFO(4, "Disconnected from " << server->m_host << ':' << server->m_port); - } -} - -MergeMiningClientTari::gRPC_Server::gRPC_Server(const std::string& socks5Proxy) - : TCPServer(1, MergeMiningClientTari::gRPC_Client::allocate, socks5Proxy) - , m_port(0) -{ -} - -MergeMiningClientTari::gRPC_Server::~gRPC_Server() -{ -} - -bool MergeMiningClientTari::gRPC_Server::start(bool use_dns, const std::string& host, int port) -{ const int err = uv_thread_create(&m_loopThread, loop, this); if (err) { LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); @@ -151,30 +197,109 @@ bool MergeMiningClientTari::gRPC_Server::start(bool use_dns, const std::string& } m_loopThreadCreated = true; + return true; +} - m_host = host; - m_port = port; +bool MergeMiningClientTari::TariServer::connect_upstream(TariClient* downstream) +{ + const bool is_v6 = m_TariNodeIsV6; + const std::string& ip = m_TariNodeHost; + const int port = m_TariNodePort; - std::string ip = host; - bool is_v6 = host.find_first_of(':') != std::string::npos; + TariClient* upstream = static_cast(get_client()); - if (!use_dns || resolve_host(ip, is_v6)) { - if (!connect_to_peer(is_v6, ip.c_str(), port)) { - LOGERR(1, "Failed to connect to " << host << ':' << port); - return false; - } + upstream->m_owner = this; + upstream->m_port = port; + upstream->m_isV6 = is_v6; + + if (!str_to_ip(is_v6, ip.c_str(), upstream->m_addr)) { + return_client(upstream); + return false; + } + + log::Stream s(upstream->m_addrString); + if (is_v6) { + s << '[' << ip << "]:" << port << '\0'; + } + else { + s << ip << ':' << port << '\0'; + } + + if (!connect_to_peer(upstream)) { + return false; + } + + upstream->m_pairedClient = downstream; + upstream->m_pairedClientSavedResetCounter = downstream->m_resetCounter; + + downstream->m_pairedClient = upstream; + downstream->m_pairedClientSavedResetCounter = upstream->m_resetCounter; + + return true; +} + +void MergeMiningClientTari::TariServer::on_shutdown() +{ +} + +const char* MergeMiningClientTari::TariServer::get_log_category() const +{ + return log_category_prefix; +} + +MergeMiningClientTari::TariClient::TariClient() + : Client(m_buf, sizeof(m_buf)) + , m_pairedClient(nullptr) + , m_pairedClientSavedResetCounter(std::numeric_limits::max()) +{ + m_buf[0] = '\0'; +} + +void MergeMiningClientTari::TariClient::reset() +{ + if (is_paired()) { + m_pairedClient->m_pairedClient = nullptr; + m_pairedClient->close(); + m_pairedClient = nullptr; + } + m_pairedClientSavedResetCounter = std::numeric_limits::max(); +} + +bool MergeMiningClientTari::TariClient::on_connect() +{ + MergeMiningClientTari::TariServer* server = static_cast(m_owner); + if (!server) { + return false; + } + + if (m_isIncoming) { + return server->connect_upstream(this); } return true; } -void MergeMiningClientTari::gRPC_Server::on_shutdown() +bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size) { -} + MergeMiningClientTari::TariServer* server = static_cast(m_owner); + if (!server) { + return false; + } -const char* MergeMiningClientTari::gRPC_Server::get_log_category() const -{ - return log_category_prefix; + if (!is_paired()) { + return false; + } + + return server->send(m_pairedClient, + [data, size](uint8_t* buf, size_t buf_size) -> size_t + { + if (size > buf_size) { + return 0U; + } + + memcpy(buf, data, size); + return size; + }); } } // namespace p2pool diff --git a/src/merge_mining_client_tari.h b/src/merge_mining_client_tari.h index 336216d..1fdbe72 100644 --- a/src/merge_mining_client_tari.h +++ b/src/merge_mining_client_tari.h @@ -18,6 +18,7 @@ #pragma once #include "tcp_server.h" +#include "Tari/proto.h" namespace p2pool { @@ -26,7 +27,7 @@ class p2pool; class MergeMiningClientTari : public IMergeMiningClient, public nocopy_nomove { public: - MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet); + MergeMiningClientTari(p2pool* pool, std::string host, const std::string& wallet); ~MergeMiningClientTari(); bool get_params(ChainParameters& out_params) const override; @@ -35,48 +36,61 @@ public: static constexpr char TARI_PREFIX[] = "tari://"; private: - struct gRPC_Server : public TCPServer - { - explicit gRPC_Server(const std::string& socks5Proxy); - ~gRPC_Server(); - - [[nodiscard]] bool start(bool use_dns, const std::string& host, int port); - - void on_shutdown() override; - - [[nodiscard]] const char* get_log_category() const override; - - std::string m_host; - int m_port; - } *m_server; - - struct gRPC_Client : public TCPServer::Client - { - gRPC_Client(); - ~gRPC_Client() {} - - static Client* allocate() { return new gRPC_Client(); } - virtual size_t size() const override { return sizeof(gRPC_Client); } - - void reset() override; - [[nodiscard]] bool on_connect() override; - [[nodiscard]] bool on_read(char* data, uint32_t size) override; - void on_read_failed(int err) override; - void on_disconnected() override; - - char m_buf[1024]; - std::vector m_data; - }; - - std::string m_host; - uint32_t m_port; + void merge_mining_get_chain_id(); mutable uv_rwlock_t m_lock; ChainParameters m_chainParams; std::string m_auxWallet; - p2pool* m_pool; + +private: + static constexpr uint64_t BUF_SIZE = 16384; + + struct TariClient; + + struct TariServer : public TCPServer + { + explicit TariServer(const std::string& socks5Proxy); + ~TariServer() {} + + [[nodiscard]] bool start(); + [[nodiscard]] bool connect_upstream(TariClient* downstream); + + void on_shutdown() override; + + [[nodiscard]] const char* get_log_category() const override; + + bool m_TariNodeIsV6; + std::string m_TariNodeHost; + int m_TariNodePort; + + int m_internalPort; + } *m_server; + + const std::string m_hostStr; + + tari::rpc::BaseNode::Stub* m_TariNode; + + struct TariClient : public TCPServer::Client + { + TariClient(); + ~TariClient() {} + + static Client* allocate() { return new TariClient(); } + virtual size_t size() const override { return sizeof(TariClient); } + + void reset() override; + [[nodiscard]] bool on_connect() override; + [[nodiscard]] bool on_read(char* data, uint32_t size) override; + + char m_buf[BUF_SIZE]; + + bool is_paired() const { return m_pairedClient && (m_pairedClient->m_resetCounter == m_pairedClientSavedResetCounter); } + + TariClient* m_pairedClient; + uint32_t m_pairedClientSavedResetCounter; + }; }; } // namespace p2pool diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 873fc6b..23aec70 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -42,12 +42,6 @@ #include #include -#define GRPC_TEST 0 - -#if GRPC_TEST -#include "Tari/proto.h" -#endif - LOG_CATEGORY(P2Pool) constexpr int BLOCK_HEADERS_REQUIRED = 720; @@ -67,34 +61,6 @@ p2pool::p2pool(int argc, char* argv[]) , m_startTime(seconds_since_epoch()) , m_lastMinerDataReceived(0) { -#if GRPC_TEST - { - using namespace tari::rpc; - BaseNode::Stub stub(grpc::CreateChannel("127.0.0.1:18142", grpc::InsecureChannelCredentials())); - - grpc::Status status; - - NewBlockTemplateResponse response; - { - grpc::ClientContext context; - NewBlockTemplateRequest request; - PowAlgo* algo = new PowAlgo(); - algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX); - request.clear_algo(); - request.set_allocated_algo(algo); - request.set_max_weight(1); - status = stub.GetNewBlockTemplate(&context, request, &response); - } - - GetNewBlockResult response2; - grpc::ClientContext context2; - status = stub.GetNewBlock(&context2, response.new_block_template(), &response2); - - const std::string& s = response2.tari_unique_id(); - LOGINFO(0, "Tari unique_id = " << log::hex_buf(s.data(), s.size())); - } -#endif - LOGINFO(1, log::LightCyan() << VERSION); Params* p = new Params(argc, argv); diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index bc41c47..fb3df62 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -554,9 +554,9 @@ bool TCPServer::send_internal(Client* client, Callback PANIC_STOP(); } - if (bytes_written == 0) { - LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); - return true; + if (!bytes_written) { + LOGWARN(1, "send callback failed"); + return false; } WriteBuf* buf = get_write_buffer(bytes_written); diff --git a/src/tcp_server.h b/src/tcp_server.h index 83dc65f..372854f 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -42,6 +42,8 @@ public: [[nodiscard]] virtual int external_listen_port() const { return m_listenPort; } [[nodiscard]] bool connect_to_peer(bool is_v6, const raw_ip& ip, int port); + [[nodiscard]] bool connect_to_peer(Client* client); + virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {} void ban(bool is_v6, raw_ip ip, uint64_t seconds); @@ -136,8 +138,6 @@ private: void on_new_client(uv_stream_t* server); void on_new_client(uv_stream_t* server, Client* client); - [[nodiscard]] bool connect_to_peer(Client* client); - [[nodiscard]] bool send_internal(Client* client, Callback::Base&& callback); allocate_client_callback m_allocateNewClient;