Optimized SideChain::get_outputs_blob()

This commit is contained in:
SChernykh 2023-01-15 21:29:35 +01:00
parent d4329ae594
commit e828709090

View file

@ -726,7 +726,18 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
{ {
blob.clear(); blob.clear();
std::vector<MinerShare> tmpShares; struct Data
{
FORCEINLINE Data() : counter(0) {}
Data(Data&&) = delete;
Data& operator=(Data&&) = delete;
std::vector<MinerShare> tmpShares;
hash txkeySec;
std::atomic<int> counter;
};
std::shared_ptr<Data> data;
std::vector<uint64_t> tmpRewards; std::vector<uint64_t> tmpRewards;
{ {
ReadLock lock(m_sidechainLock); ReadLock lock(m_sidechainLock);
@ -755,20 +766,19 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
return true; return true;
} }
if (!get_shares(block, tmpShares) || !split_reward(total_reward, tmpShares, tmpRewards) || (tmpRewards.size() != tmpShares.size())) { data = std::make_shared<Data>();
data->txkeySec = block->m_txkeySec;
if (!get_shares(block, data->tmpShares) || !split_reward(total_reward, data->tmpShares, tmpRewards) || (tmpRewards.size() != data->tmpShares.size())) {
return false; return false;
} }
} }
const size_t n = tmpShares.size(); const size_t n = data->tmpShares.size();
data->counter = static_cast<int>(n) - 1;
// Helper jobs call get_eph_public_key with indices in descending order // Helper jobs call get_eph_public_key with indices in descending order
// Current thread will process indices in ascending order so when they meet, everything will be cached // Current thread will process indices in ascending order so when they meet, everything will be cached
std::atomic<int> counter{ 0 };
std::atomic<int> num_helper_jobs_finished{ 0 };
int num_helper_jobs_started = 0;
if (loop) { if (loop) {
uint32_t HELPER_JOBS_COUNT = std::thread::hardware_concurrency(); uint32_t HELPER_JOBS_COUNT = std::thread::hardware_concurrency();
@ -785,38 +795,26 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
struct Work struct Work
{ {
uv_work_t req; uv_work_t req;
const std::vector<MinerShare>& tmpShares; std::shared_ptr<Data> data;
const hash& txkeySec;
std::atomic<int>& counter;
std::atomic<int>& num_helper_jobs_finished;
// Fix MSVC warnings
Work() = delete;
Work& operator=(Work&&) = delete;
}; };
counter = static_cast<int>(n) - 1;
num_helper_jobs_started = HELPER_JOBS_COUNT;
for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) { for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) {
Work* w = new Work{ {}, tmpShares, block->m_txkeySec, counter, num_helper_jobs_finished }; Work* w = new Work{ {}, data };
w->req.data = w; w->req.data = w;
const int err = uv_queue_work(loop, &w->req, const int err = uv_queue_work(loop, &w->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
Work* work = reinterpret_cast<Work*>(req->data); Data* d = reinterpret_cast<Work*>(req->data)->data.get();
hash eph_public_key; hash eph_public_key;
int index; int index;
while ((index = work->counter.fetch_sub(1)) >= 0) { while ((index = d->counter.fetch_sub(1)) >= 0) {
uint8_t view_tag; uint8_t view_tag;
if (!work->tmpShares[index].m_wallet->get_eph_public_key(work->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag)) { if (!d->tmpShares[index].m_wallet->get_eph_public_key(d->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag)) {
LOGWARN(6, "get_eph_public_key failed at index " << index); LOGWARN(6, "get_eph_public_key failed at index " << index);
} }
} }
++work->num_helper_jobs_finished;
}, },
[](uv_work_t* req, int /*status*/) [](uv_work_t* req, int /*status*/)
{ {
@ -825,7 +823,6 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
if (err) { if (err) {
LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err)); LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err));
--num_helper_jobs_started;
delete w; delete w;
} }
} }
@ -843,10 +840,10 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
hash eph_public_key; hash eph_public_key;
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
// stop helper jobs when they meet with current thread // stop helper jobs when they meet with current thread
const int c = counter.load(); const int c = data->counter.load();
if ((c >= 0) && (static_cast<int>(i) >= c)) { if ((c >= 0) && (static_cast<int>(i) >= c)) {
// this will cause all helper jobs to finish immediately // this will cause all helper jobs to finish immediately
counter = -1; data->counter = -1;
} }
writeVarint(tmpRewards[i], blob); writeVarint(tmpRewards[i], blob);
@ -854,7 +851,7 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
blob.emplace_back(tx_type); blob.emplace_back(tx_type);
uint8_t view_tag; uint8_t view_tag;
if (!tmpShares[i].m_wallet->get_eph_public_key(block->m_txkeySec, i, eph_public_key, view_tag)) { if (!data->tmpShares[i].m_wallet->get_eph_public_key(data->txkeySec, i, eph_public_key, view_tag)) {
LOGWARN(6, "get_eph_public_key failed at index " << i); LOGWARN(6, "get_eph_public_key failed at index " << i);
} }
blob.insert(blob.end(), eph_public_key.h, eph_public_key.h + HASH_SIZE); blob.insert(blob.end(), eph_public_key.h, eph_public_key.h + HASH_SIZE);
@ -867,16 +864,6 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
} }
block->m_outputs.shrink_to_fit(); block->m_outputs.shrink_to_fit();
if (loop) {
// this will cause all helper jobs to finish immediately
counter = -1;
while (num_helper_jobs_finished < num_helper_jobs_started) {
std::this_thread::yield();
}
}
return true; return true;
} }