mirror of
https://github.com/SChernykh/p2pool.git
synced 2024-12-22 19:39:22 +00:00
Added Tari node polling
This commit is contained in:
parent
bbfe3d84b7
commit
785aaec4c3
2 changed files with 101 additions and 51 deletions
|
@ -33,6 +33,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con
|
|||
, m_pool(pool)
|
||||
, m_server(new TariServer(pool->params().m_socks5Proxy))
|
||||
, m_hostStr(host)
|
||||
, m_workerStop(0)
|
||||
{
|
||||
if (host.find(TARI_PREFIX) != 0) {
|
||||
LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found");
|
||||
|
@ -65,7 +66,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con
|
|||
throw std::exception();
|
||||
}
|
||||
|
||||
uv_rwlock_init_checked(&m_lock);
|
||||
uv_rwlock_init_checked(&m_chainParamsLock);
|
||||
|
||||
if (!m_server->start()) {
|
||||
throw std::exception();
|
||||
|
@ -77,22 +78,40 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con
|
|||
|
||||
m_TariNode = new BaseNode::Stub(grpc::CreateChannel(buf, grpc::InsecureChannelCredentials()));
|
||||
|
||||
merge_mining_get_chain_id();
|
||||
uv_mutex_init_checked(&m_workerLock);
|
||||
uv_cond_init_checked(&m_workerCond);
|
||||
|
||||
const int err = uv_thread_create(&m_worker, run_wrapper, this);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to start worker thread, error " << uv_err_name(err));
|
||||
throw std::exception();
|
||||
}
|
||||
}
|
||||
|
||||
MergeMiningClientTari::~MergeMiningClientTari()
|
||||
{
|
||||
LOGINFO(1, "stopping");
|
||||
|
||||
m_workerStop.exchange(1);
|
||||
uv_cond_signal(&m_workerCond);
|
||||
uv_thread_join(&m_worker);
|
||||
|
||||
m_server->shutdown_tcp();
|
||||
delete m_server;
|
||||
|
||||
delete m_TariNode;
|
||||
|
||||
uv_rwlock_destroy(&m_chainParamsLock);
|
||||
|
||||
uv_mutex_destroy(&m_workerLock);
|
||||
uv_cond_destroy(&m_workerCond);
|
||||
|
||||
LOGINFO(1, "stopped");
|
||||
}
|
||||
|
||||
bool MergeMiningClientTari::get_params(ChainParameters& out_params) const
|
||||
{
|
||||
ReadLock lock(m_lock);
|
||||
ReadLock lock(m_chainParamsLock);
|
||||
|
||||
if (m_chainParams.aux_id.empty() || m_chainParams.aux_diff.empty()) {
|
||||
return false;
|
||||
|
@ -108,24 +127,20 @@ void MergeMiningClientTari::submit_solution(const std::vector<uint8_t>& blob, co
|
|||
(void)merkle_proof;
|
||||
}
|
||||
|
||||
void MergeMiningClientTari::merge_mining_get_chain_id()
|
||||
void MergeMiningClientTari::run_wrapper(void* arg)
|
||||
{
|
||||
struct Work
|
||||
reinterpret_cast<MergeMiningClientTari*>(arg)->run();
|
||||
LOGINFO(1, "worker thread stopped");
|
||||
}
|
||||
|
||||
void MergeMiningClientTari::run()
|
||||
{
|
||||
uv_work_t req;
|
||||
MergeMiningClientTari* client;
|
||||
};
|
||||
LOGINFO(1, "worker thread ready");
|
||||
|
||||
Work* work = new Work{};
|
||||
work->req.data = work;
|
||||
work->client = this;
|
||||
do {
|
||||
MutexLock lock(m_workerLock);
|
||||
|
||||
uv_queue_work(m_server->get_loop(), &work->req,
|
||||
[](uv_work_t* req)
|
||||
{
|
||||
BACKGROUND_JOB_START(MergeMiningClientTari::merge_mining_get_chain_id);
|
||||
|
||||
MergeMiningClientTari* client = reinterpret_cast<Work*>(req->data)->client;
|
||||
LOGINFO(6, "Getting new block template from Tari node");
|
||||
|
||||
grpc::Status status;
|
||||
|
||||
|
@ -138,28 +153,33 @@ void MergeMiningClientTari::merge_mining_get_chain_id()
|
|||
|
||||
grpc::ClientContext ctx;
|
||||
NewBlockTemplateResponse response;
|
||||
status = client->m_TariNode->GetNewBlockTemplate(&ctx, request, &response);
|
||||
status = m_TariNode->GetNewBlockTemplate(&ctx, request, &response);
|
||||
|
||||
grpc::ClientContext ctx2;
|
||||
GetNewBlockResult response2;
|
||||
status = client->m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2);
|
||||
status = m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2);
|
||||
|
||||
bool aux_id_empty;
|
||||
{
|
||||
ReadLock lock2(m_chainParamsLock);
|
||||
aux_id_empty = m_chainParams.aux_id.empty();
|
||||
}
|
||||
|
||||
if (aux_id_empty) {
|
||||
const std::string& id = response2.tari_unique_id();
|
||||
LOGINFO(1, client->m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size()));
|
||||
LOGINFO(1, m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size()));
|
||||
|
||||
if (id.size() == HASH_SIZE) {
|
||||
WriteLock lock(client->m_lock);
|
||||
std::copy(id.begin(), id.end(), client->m_chainParams.aux_id.h);
|
||||
WriteLock lock2(m_chainParamsLock);
|
||||
std::copy(id.begin(), id.end(), m_chainParams.aux_id.h);
|
||||
}
|
||||
else {
|
||||
LOGERR(1, "Tari unique_id has invalid size (" << id.size() << ')');
|
||||
}
|
||||
},
|
||||
[](uv_work_t* req, int /*status*/)
|
||||
{
|
||||
delete reinterpret_cast<Work*>(req->data);
|
||||
BACKGROUND_JOB_STOP(MergeMiningClientTari::merge_mining_get_chain_id);
|
||||
});
|
||||
}
|
||||
|
||||
LOGINFO(6, "Tari height = " << response2.block().header().height());
|
||||
} while ((uv_cond_timedwait(&m_workerCond, &m_workerLock, 500'000'000) == UV_ETIMEDOUT) && (m_workerStop.load() == 0));
|
||||
}
|
||||
|
||||
// TariServer and TariClient are simply a proxy from a localhost TCP port to the external Tari node
|
||||
|
@ -232,9 +252,6 @@ bool MergeMiningClientTari::TariServer::connect_upstream(TariClient* downstream)
|
|||
upstream->m_pairedClient = downstream;
|
||||
upstream->m_pairedClientSavedResetCounter = downstream->m_resetCounter;
|
||||
|
||||
downstream->m_pairedClient = upstream;
|
||||
downstream->m_pairedClientSavedResetCounter = upstream->m_resetCounter;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -275,6 +292,29 @@ bool MergeMiningClientTari::TariClient::on_connect()
|
|||
if (m_isIncoming) {
|
||||
return server->connect_upstream(this);
|
||||
}
|
||||
else {
|
||||
TariClient* downstream = m_pairedClient;
|
||||
downstream->m_pairedClient = this;
|
||||
downstream->m_pairedClientSavedResetCounter = m_resetCounter;
|
||||
|
||||
const std::vector<uint8_t>& v = downstream->m_pendingData;
|
||||
|
||||
if (!v.empty()) {
|
||||
const bool result = server->send(this,
|
||||
[&v](uint8_t* buf, size_t buf_size) -> size_t
|
||||
{
|
||||
if (v.size() > buf_size) {
|
||||
return 0U;
|
||||
}
|
||||
|
||||
std::copy(v.begin(), v.end(), buf);
|
||||
return v.size();
|
||||
});
|
||||
|
||||
downstream->m_pendingData.clear();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -287,7 +327,9 @@ bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size)
|
|||
}
|
||||
|
||||
if (!is_paired()) {
|
||||
return false;
|
||||
LOGWARN(5, "Read " << size << " bytes from " << static_cast<char*>(m_addrString) << " but it's not paired yet. Buffering it.");
|
||||
m_pendingData.insert(m_pendingData.end(), data, data + size);
|
||||
return true;
|
||||
}
|
||||
|
||||
return server->send(m_pairedClient,
|
||||
|
@ -297,7 +339,7 @@ bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size)
|
|||
return 0U;
|
||||
}
|
||||
|
||||
memcpy(buf, data, size);
|
||||
std::copy(data, data + size, buf);
|
||||
return size;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -36,9 +36,7 @@ public:
|
|||
static constexpr char TARI_PREFIX[] = "tari://";
|
||||
|
||||
private:
|
||||
void merge_mining_get_chain_id();
|
||||
|
||||
mutable uv_rwlock_t m_lock;
|
||||
mutable uv_rwlock_t m_chainParamsLock;
|
||||
ChainParameters m_chainParams;
|
||||
|
||||
std::string m_auxWallet;
|
||||
|
@ -85,12 +83,22 @@ private:
|
|||
[[nodiscard]] bool on_read(char* data, uint32_t size) override;
|
||||
|
||||
char m_buf[BUF_SIZE];
|
||||
std::vector<uint8_t> m_pendingData;
|
||||
|
||||
bool is_paired() const { return m_pairedClient && (m_pairedClient->m_resetCounter == m_pairedClientSavedResetCounter); }
|
||||
|
||||
TariClient* m_pairedClient;
|
||||
uint32_t m_pairedClientSavedResetCounter;
|
||||
};
|
||||
|
||||
uv_thread_t m_worker;
|
||||
|
||||
uv_mutex_t m_workerLock;
|
||||
uv_cond_t m_workerCond;
|
||||
std::atomic<uint32_t> m_workerStop;
|
||||
|
||||
static void run_wrapper(void* arg);
|
||||
void run();
|
||||
};
|
||||
|
||||
} // namespace p2pool
|
||||
|
|
Loading…
Reference in a new issue