Tari: connect to a node and get unique_id

This commit is contained in:
SChernykh 2024-02-15 17:43:15 +01:00
parent cf165af025
commit dbb21151b0
9 changed files with 281 additions and 166 deletions

View file

@ -424,9 +424,10 @@ jobs:
strategy:
matrix:
config:
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "ON"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "OFF"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON", grpc: "ON"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "ON", upnp: "ON", grpc: "OFF"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "ON", grpc: "OFF"}
- {vs: Visual Studio 16 2019, os: 2019, vspath: "C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\Enterprise", rx: "OFF", upnp: "OFF", grpc: "OFF"}
steps:
- name: Checkout repository
@ -441,7 +442,7 @@ jobs:
run: |
mkdir build
cd build
cmake .. -G "${{ matrix.config.vs }}" -DCMAKE_SYSTEM_VERSION="10.0" -DWITH_RANDOMX=${{ matrix.config.rx }} -DWITH_UPNP=${{ matrix.config.upnp }}
cmake .. -G "${{ matrix.config.vs }}" -DCMAKE_SYSTEM_VERSION="10.0" -DWITH_RANDOMX=${{ matrix.config.rx }} -DWITH_UPNP=${{ matrix.config.upnp }} -DWITH_GRPC=${{ matrix.config.grpc }}
& "${{ matrix.config.vspath }}\\MSBuild\\Current\\Bin\\amd64\\msbuild" -v:m /m /p:Configuration=Release p2pool.vcxproj
- name: Check Windows 7 compatibility
@ -477,7 +478,7 @@ jobs:
- name: Archive binary
uses: actions/upload-artifact@v4
with:
name: p2pool-vs-${{ matrix.config.os }}-randomx-${{ matrix.config.rx }}-upnp-${{ matrix.config.upnp }}.exe
name: p2pool-vs-${{ matrix.config.os }}-randomx-${{ matrix.config.rx }}-upnp-${{ matrix.config.upnp }}-grpc-${{ matrix.config.grpc }}.exe
path: build/Release/p2pool.exe
build-macos:

View file

@ -16,6 +16,7 @@ option(STATIC_LIBS "Link libuv and libzmq statically" OFF)
option(WITH_RANDOMX "Include the RandomX library in the build. If this is turned off, p2pool will rely on monerod for verifying RandomX hashes" ON)
option(WITH_LTO "Use link-time compiler optimization (if linking fails for you, run cmake with -DWITH_LTO=OFF)" ON)
option(WITH_UPNP "Include UPnP support. If this is turned off, p2pool will not be able to configure port forwarding on UPnP-enabled routers." ON)
option(WITH_GRPC "Include gRPC support. If this is turned off, p2pool will not be able to merge mine with Tari." ON)
option(DEV_TEST_SYNC "[Developer only] Sync test, stop p2pool after sync is complete" OFF)
option(DEV_WITH_TSAN "[Developer only] Compile with thread sanitizer" OFF)
@ -31,10 +32,13 @@ if (CMAKE_CXX_COMPILER_ID MATCHES MSVC)
set(CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION 10.0)
endif()
include(cmake/grpc.cmake)
if (WITH_GRPC)
add_definitions(-DWITH_GRPC)
include(cmake/grpc.cmake)
add_subdirectory(external/src/Tari)
set(LIBS ${LIBS} Tari_gRPC)
add_subdirectory(external/src/Tari)
set(LIBS ${LIBS} Tari_gRPC)
endif()
if (WITH_RANDOMX)
add_definitions(-DWITH_RANDOMX)
@ -94,7 +98,6 @@ set(HEADERS
src/mempool.h
src/merge_mining_client.h
src/merge_mining_client_json_rpc.h
src/merge_mining_client_tari.h
src/merkle.h
src/p2p_server.h
src/p2pool.h
@ -129,7 +132,6 @@ set(SOURCES
src/mempool.cpp
src/merge_mining_client.cpp
src/merge_mining_client_json_rpc.cpp
src/merge_mining_client_tari.cpp
src/merkle.cpp
src/p2p_server.cpp
src/p2pool.cpp
@ -150,6 +152,11 @@ if (WITH_RANDOMX)
set(SOURCES ${SOURCES} src/miner.cpp)
endif()
if (WITH_GRPC)
set(HEADERS ${HEADERS} src/merge_mining_client_tari.h)
set(SOURCES ${SOURCES} src/merge_mining_client_tari.cpp)
endif()
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Header Files" FILES ${HEADERS})
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Source Files" FILES ${SOURCES})
@ -379,7 +386,9 @@ if (STATIC_BINARY OR STATIC_LIBS)
set(STATIC_LIBS ${STATIC_LIBS} resolv)
endif()
if (WITH_GRPC)
set(STATIC_LIBS ${STATIC_LIBS} Tari_gRPC grpc grpc++ libprotobuf)
endif()
target_link_libraries(${CMAKE_PROJECT_NAME}
"${CMAKE_SOURCE_DIR}/external/src/libzmq/build/lib/libzmq.a"

View file

@ -38,7 +38,7 @@ include_directories(../grpc/third_party/abseil-cpp)
if (CMAKE_CXX_COMPILER_ID MATCHES MSVC)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /W0 /Zi /Od /Ob0 /MP /MTd")
set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} /W0 /O1 /Ob2 /Oi /Os /Oy /MP /GL /MT")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /W0 /O1 /Ob2 /Oi /Os /Oy /MP /GL /MT")
else()

View file

@ -19,7 +19,7 @@
#include "merge_mining_client.h"
#include "merge_mining_client_json_rpc.h"
#ifndef P2POOL_UNIT_TESTS
#if defined(WITH_GRPC) && !defined(P2POOL_UNIT_TESTS)
#include "merge_mining_client_tari.h"
#endif
@ -28,7 +28,7 @@ namespace p2pool {
IMergeMiningClient* IMergeMiningClient::create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept
{
try {
#ifndef P2POOL_UNIT_TESTS
#if defined(WITH_GRPC) && !defined(P2POOL_UNIT_TESTS)
if (host.find(MergeMiningClientTari::TARI_PREFIX) == 0) {
return new MergeMiningClientTari(pool, host, wallet);
}

View file

@ -20,40 +20,64 @@
#include "merge_mining_client_tari.h"
#include "p2pool.h"
#include "params.h"
#include "Tari/proto.h"
LOG_CATEGORY(MergeMiningClientTari)
using namespace tari::rpc;
namespace p2pool {
MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet)
: m_server(new gRPC_Server(pool->params().m_socks5Proxy))
, m_host(host)
, m_port(0)
MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, const std::string& wallet)
: m_chainParams{}
, m_auxWallet(wallet)
, m_pool(pool)
, m_server(new TariServer(pool->params().m_socks5Proxy))
, m_hostStr(host)
{
if (host.find(TARI_PREFIX) != 0) {
LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found");
throw std::exception();
}
const size_t k = host.find_last_of(':');
if (k != std::string::npos) {
m_host = host.substr(sizeof(TARI_PREFIX) - 1, k - (sizeof(TARI_PREFIX) - 1));
m_port = std::stoul(host.substr(k + 1), nullptr, 10);
host.erase(0, sizeof(TARI_PREFIX) - 1);
while (host.back() == '/') {
host.pop_back();
}
if (m_host.empty() || (m_port == 0) || (m_port >= 65536)) {
if (host.empty()) {
LOGERR(1, "Invalid host");
throw std::exception();
}
m_server->parse_address_list(host,
[this](bool is_v6, const std::string& /*address*/, std::string ip, int port)
{
if (!m_pool->params().m_dns || resolve_host(ip, is_v6)) {
m_server->m_TariNodeIsV6 = is_v6;
m_server->m_TariNodeHost = ip;
m_server->m_TariNodePort = port;
}
});
if (m_server->m_TariNodeHost.empty() || (m_server->m_TariNodePort == 0) || (m_server->m_TariNodePort >= 65536)) {
LOGERR(1, "Invalid host " << host);
throw std::exception();
}
uv_rwlock_init_checked(&m_lock);
if (!m_server->start(m_pool->params().m_dns, m_host, m_port)) {
if (!m_server->start()) {
throw std::exception();
}
char buf[32] = {};
log::Stream s(buf);
s << "127.0.0.1:" << m_server->external_listen_port();
m_TariNode = new BaseNode::Stub(grpc::CreateChannel(buf, grpc::InsecureChannelCredentials()));
merge_mining_get_chain_id();
}
MergeMiningClientTari::~MergeMiningClientTari()
@ -61,6 +85,8 @@ MergeMiningClientTari::~MergeMiningClientTari()
m_server->shutdown_tcp();
delete m_server;
delete m_TariNode;
LOGINFO(1, "stopped");
}
@ -82,68 +108,88 @@ void MergeMiningClientTari::submit_solution(const std::vector<uint8_t>& blob, co
(void)merkle_proof;
}
MergeMiningClientTari::gRPC_Client::gRPC_Client()
: Client(m_buf, sizeof(m_buf))
void MergeMiningClientTari::merge_mining_get_chain_id()
{
m_buf[0] = '\0';
struct Work
{
uv_work_t req;
MergeMiningClientTari* client;
};
Work* work = new Work{};
work->req.data = work;
work->client = this;
uv_queue_work(m_server->get_loop(), &work->req,
[](uv_work_t* req)
{
BACKGROUND_JOB_START(MergeMiningClientTari::merge_mining_get_chain_id);
MergeMiningClientTari* client = reinterpret_cast<Work*>(req->data)->client;
grpc::Status status;
NewBlockTemplateRequest request;
PowAlgo* algo = new PowAlgo();
algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX);
request.clear_algo();
request.set_allocated_algo(algo);
request.set_max_weight(1);
grpc::ClientContext ctx;
NewBlockTemplateResponse response;
status = client->m_TariNode->GetNewBlockTemplate(&ctx, request, &response);
grpc::ClientContext ctx2;
GetNewBlockResult response2;
status = client->m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2);
const std::string& id = response2.tari_unique_id();
LOGINFO(1, client->m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size()));
if (id.size() == HASH_SIZE) {
WriteLock lock(client->m_lock);
std::copy(id.begin(), id.end(), client->m_chainParams.aux_id.h);
}
else {
LOGERR(1, "Tari unique_id has invalid size (" << id.size() << ')');
}
},
[](uv_work_t* req, int /*status*/)
{
delete reinterpret_cast<Work*>(req->data);
BACKGROUND_JOB_STOP(MergeMiningClientTari::merge_mining_get_chain_id);
});
}
void MergeMiningClientTari::gRPC_Client::reset()
// TariServer and TariClient are simply a proxy from a localhost TCP port to the external Tari node
// This is needed for SOCKS5 proxy support (gRPC library doesn't support it natively)
MergeMiningClientTari::TariServer::TariServer(const std::string& socks5Proxy)
: TCPServer(1, MergeMiningClientTari::TariClient::allocate, socks5Proxy)
, m_TariNodeIsV6(false)
, m_TariNodeHost()
, m_TariNodePort(0)
, m_internalPort(0)
{
m_data.clear();
m_callbackBuf.resize(MergeMiningClientTari::BUF_SIZE);
}
bool MergeMiningClientTari::gRPC_Client::on_connect()
bool MergeMiningClientTari::TariServer::start()
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Connected to " << server->m_host << ':' << server->m_port);
std::random_device rd;
for (size_t i = 0; i < 10; ++i) {
if (start_listening(false, "127.0.0.1", 49152 + (rd() % 16384))) {
break;
}
}
return true;
}
bool MergeMiningClientTari::gRPC_Client::on_read(char* data, uint32_t size)
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Read " << size << " bytes from " << server->m_host << ':' << server->m_port);
LOGINFO(4, log::hex_buf(data, size));
if (m_listenPort < 0) {
LOGERR(1, "failed to listen on TCP port");
return false;
}
m_data.insert(m_data.end(), data, data + size);
return true;
}
void MergeMiningClientTari::gRPC_Client::on_read_failed(int err)
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGERR(1, "Read from " << server->m_host << ':' << server->m_port << "failed, error " << err);
}
}
void MergeMiningClientTari::gRPC_Client::on_disconnected()
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Disconnected from " << server->m_host << ':' << server->m_port);
}
}
MergeMiningClientTari::gRPC_Server::gRPC_Server(const std::string& socks5Proxy)
: TCPServer(1, MergeMiningClientTari::gRPC_Client::allocate, socks5Proxy)
, m_port(0)
{
}
MergeMiningClientTari::gRPC_Server::~gRPC_Server()
{
}
bool MergeMiningClientTari::gRPC_Server::start(bool use_dns, const std::string& host, int port)
{
const int err = uv_thread_create(&m_loopThread, loop, this);
if (err) {
LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err));
@ -151,30 +197,109 @@ bool MergeMiningClientTari::gRPC_Server::start(bool use_dns, const std::string&
}
m_loopThreadCreated = true;
return true;
}
m_host = host;
m_port = port;
bool MergeMiningClientTari::TariServer::connect_upstream(TariClient* downstream)
{
const bool is_v6 = m_TariNodeIsV6;
const std::string& ip = m_TariNodeHost;
const int port = m_TariNodePort;
std::string ip = host;
bool is_v6 = host.find_first_of(':') != std::string::npos;
TariClient* upstream = static_cast<TariClient*>(get_client());
if (!use_dns || resolve_host(ip, is_v6)) {
if (!connect_to_peer(is_v6, ip.c_str(), port)) {
LOGERR(1, "Failed to connect to " << host << ':' << port);
upstream->m_owner = this;
upstream->m_port = port;
upstream->m_isV6 = is_v6;
if (!str_to_ip(is_v6, ip.c_str(), upstream->m_addr)) {
return_client(upstream);
return false;
}
log::Stream s(upstream->m_addrString);
if (is_v6) {
s << '[' << ip << "]:" << port << '\0';
}
else {
s << ip << ':' << port << '\0';
}
if (!connect_to_peer(upstream)) {
return false;
}
upstream->m_pairedClient = downstream;
upstream->m_pairedClientSavedResetCounter = downstream->m_resetCounter;
downstream->m_pairedClient = upstream;
downstream->m_pairedClientSavedResetCounter = upstream->m_resetCounter;
return true;
}
void MergeMiningClientTari::TariServer::on_shutdown()
{
}
const char* MergeMiningClientTari::TariServer::get_log_category() const
{
return log_category_prefix;
}
MergeMiningClientTari::TariClient::TariClient()
: Client(m_buf, sizeof(m_buf))
, m_pairedClient(nullptr)
, m_pairedClientSavedResetCounter(std::numeric_limits<uint32_t>::max())
{
m_buf[0] = '\0';
}
void MergeMiningClientTari::TariClient::reset()
{
if (is_paired()) {
m_pairedClient->m_pairedClient = nullptr;
m_pairedClient->close();
m_pairedClient = nullptr;
}
m_pairedClientSavedResetCounter = std::numeric_limits<uint32_t>::max();
}
bool MergeMiningClientTari::TariClient::on_connect()
{
MergeMiningClientTari::TariServer* server = static_cast<MergeMiningClientTari::TariServer*>(m_owner);
if (!server) {
return false;
}
if (m_isIncoming) {
return server->connect_upstream(this);
}
return true;
}
void MergeMiningClientTari::gRPC_Server::on_shutdown()
bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size)
{
}
MergeMiningClientTari::TariServer* server = static_cast<MergeMiningClientTari::TariServer*>(m_owner);
if (!server) {
return false;
}
const char* MergeMiningClientTari::gRPC_Server::get_log_category() const
{
return log_category_prefix;
if (!is_paired()) {
return false;
}
return server->send(m_pairedClient,
[data, size](uint8_t* buf, size_t buf_size) -> size_t
{
if (size > buf_size) {
return 0U;
}
memcpy(buf, data, size);
return size;
});
}
} // namespace p2pool

View file

@ -18,6 +18,7 @@
#pragma once
#include "tcp_server.h"
#include "Tari/proto.h"
namespace p2pool {
@ -26,7 +27,7 @@ class p2pool;
class MergeMiningClientTari : public IMergeMiningClient, public nocopy_nomove
{
public:
MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet);
MergeMiningClientTari(p2pool* pool, std::string host, const std::string& wallet);
~MergeMiningClientTari();
bool get_params(ChainParameters& out_params) const override;
@ -35,48 +36,61 @@ public:
static constexpr char TARI_PREFIX[] = "tari://";
private:
struct gRPC_Server : public TCPServer
{
explicit gRPC_Server(const std::string& socks5Proxy);
~gRPC_Server();
[[nodiscard]] bool start(bool use_dns, const std::string& host, int port);
void on_shutdown() override;
[[nodiscard]] const char* get_log_category() const override;
std::string m_host;
int m_port;
} *m_server;
struct gRPC_Client : public TCPServer::Client
{
gRPC_Client();
~gRPC_Client() {}
static Client* allocate() { return new gRPC_Client(); }
virtual size_t size() const override { return sizeof(gRPC_Client); }
void reset() override;
[[nodiscard]] bool on_connect() override;
[[nodiscard]] bool on_read(char* data, uint32_t size) override;
void on_read_failed(int err) override;
void on_disconnected() override;
char m_buf[1024];
std::vector<char> m_data;
};
std::string m_host;
uint32_t m_port;
void merge_mining_get_chain_id();
mutable uv_rwlock_t m_lock;
ChainParameters m_chainParams;
std::string m_auxWallet;
p2pool* m_pool;
private:
static constexpr uint64_t BUF_SIZE = 16384;
struct TariClient;
struct TariServer : public TCPServer
{
explicit TariServer(const std::string& socks5Proxy);
~TariServer() {}
[[nodiscard]] bool start();
[[nodiscard]] bool connect_upstream(TariClient* downstream);
void on_shutdown() override;
[[nodiscard]] const char* get_log_category() const override;
bool m_TariNodeIsV6;
std::string m_TariNodeHost;
int m_TariNodePort;
int m_internalPort;
} *m_server;
const std::string m_hostStr;
tari::rpc::BaseNode::Stub* m_TariNode;
struct TariClient : public TCPServer::Client
{
TariClient();
~TariClient() {}
static Client* allocate() { return new TariClient(); }
virtual size_t size() const override { return sizeof(TariClient); }
void reset() override;
[[nodiscard]] bool on_connect() override;
[[nodiscard]] bool on_read(char* data, uint32_t size) override;
char m_buf[BUF_SIZE];
bool is_paired() const { return m_pairedClient && (m_pairedClient->m_resetCounter == m_pairedClientSavedResetCounter); }
TariClient* m_pairedClient;
uint32_t m_pairedClientSavedResetCounter;
};
};
} // namespace p2pool

View file

@ -42,12 +42,6 @@
#include <fstream>
#include <numeric>
#define GRPC_TEST 0
#if GRPC_TEST
#include "Tari/proto.h"
#endif
LOG_CATEGORY(P2Pool)
constexpr int BLOCK_HEADERS_REQUIRED = 720;
@ -67,34 +61,6 @@ p2pool::p2pool(int argc, char* argv[])
, m_startTime(seconds_since_epoch())
, m_lastMinerDataReceived(0)
{
#if GRPC_TEST
{
using namespace tari::rpc;
BaseNode::Stub stub(grpc::CreateChannel("127.0.0.1:18142", grpc::InsecureChannelCredentials()));
grpc::Status status;
NewBlockTemplateResponse response;
{
grpc::ClientContext context;
NewBlockTemplateRequest request;
PowAlgo* algo = new PowAlgo();
algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX);
request.clear_algo();
request.set_allocated_algo(algo);
request.set_max_weight(1);
status = stub.GetNewBlockTemplate(&context, request, &response);
}
GetNewBlockResult response2;
grpc::ClientContext context2;
status = stub.GetNewBlock(&context2, response.new_block_template(), &response2);
const std::string& s = response2.tari_unique_id();
LOGINFO(0, "Tari unique_id = " << log::hex_buf(s.data(), s.size()));
}
#endif
LOGINFO(1, log::LightCyan() << VERSION);
Params* p = new Params(argc, argv);

View file

@ -554,9 +554,9 @@ bool TCPServer::send_internal(Client* client, Callback<size_t, uint8_t*, size_t>
PANIC_STOP();
}
if (bytes_written == 0) {
LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
return true;
if (!bytes_written) {
LOGWARN(1, "send callback failed");
return false;
}
WriteBuf* buf = get_write_buffer(bytes_written);

View file

@ -42,6 +42,8 @@ public:
[[nodiscard]] virtual int external_listen_port() const { return m_listenPort; }
[[nodiscard]] bool connect_to_peer(bool is_v6, const raw_ip& ip, int port);
[[nodiscard]] bool connect_to_peer(Client* client);
virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {}
void ban(bool is_v6, raw_ip ip, uint64_t seconds);
@ -136,8 +138,6 @@ private:
void on_new_client(uv_stream_t* server);
void on_new_client(uv_stream_t* server, Client* client);
[[nodiscard]] bool connect_to_peer(Client* client);
[[nodiscard]] bool send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback);
allocate_client_callback m_allocateNewClient;