SideChain: precalculate tx pubkeys for faster sync

This commit is contained in:
SChernykh 2022-07-14 09:04:14 +02:00
parent 4733f46a28
commit 02a8a512dc
5 changed files with 230 additions and 27 deletions

View file

@ -53,6 +53,7 @@ PoolBlock::PoolBlock()
, m_invalid(false)
, m_broadcasted(false)
, m_wantBroadcast(false)
, m_precalculated(false)
, m_localTimestamp(seconds_since_epoch())
{
uv_mutex_init_checked(&m_lock);
@ -114,6 +115,7 @@ PoolBlock& PoolBlock::operator=(const PoolBlock& b)
m_invalid = b.m_invalid;
m_broadcasted = b.m_broadcasted;
m_wantBroadcast = b.m_wantBroadcast;
m_precalculated = b.m_precalculated;
m_localTimestamp = seconds_since_epoch();
@ -242,6 +244,8 @@ void PoolBlock::reset_offchain_data()
m_broadcasted = false;
m_wantBroadcast = false;
m_precalculated = false;
m_localTimestamp = seconds_since_epoch();
}

View file

@ -132,6 +132,8 @@ struct PoolBlock
mutable bool m_broadcasted;
mutable bool m_wantBroadcast;
bool m_precalculated;
uint64_t m_localTimestamp;
void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash);

View file

@ -63,6 +63,7 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name)
, m_chainWindowSize(2160)
, m_unclePenalty(20)
, m_curDifficulty(m_minDifficulty)
, m_precalcFinished(false)
{
LOGINFO(1, log::LightCyan() << "network type = " << m_networkType);
@ -154,14 +155,39 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name)
memset(buf + 8, '*', HASH_SIZE * 2 - 16);
m_consensusIdDisplayStr.assign(buf);
LOGINFO(1, "consensus ID = " << log::LightCyan() << m_consensusIdDisplayStr.c_str());
uv_cond_init_checked(&m_precalcJobsCond);
uv_mutex_init_checked(&m_precalcJobsMutex);
m_precalcJobs.reserve(16);
uint32_t numThreads = std::thread::hardware_concurrency();
// Leave 1 CPU core free from worker threads
if (numThreads > 1) {
--numThreads;
}
// Use no more than 8 threads
numThreads = std::min<uint32_t>(numThreads, 8);
LOGINFO(4, "running " << numThreads << " pre-calculation workers");
for (uint32_t i = 0; i < numThreads; ++i) {
m_precalcWorkers.emplace_back(&SideChain::precalc_worker, this);
}
m_uniquePrecalcInputs = new unordered_set<size_t>();
}
SideChain::~SideChain()
{
finish_precalc();
uv_rwlock_destroy(&m_sidechainLock);
uv_mutex_destroy(&m_seenWalletsLock);
uv_mutex_destroy(&m_seenBlocksLock);
uv_rwlock_destroy(&m_curDifficultyLock);
for (const auto& it : m_blocksById) {
delete it.second;
}
@ -364,6 +390,50 @@ bool SideChain::get_shares(const PoolBlock* tip, std::vector<MinerShare>& shares
return true;
}
bool SideChain::get_wallets(const PoolBlock* tip, std::vector<const Wallet*>& wallets) const
{
// Collect wallets from each block in the PPLNS window, starting from the "tip"
wallets.clear();
wallets.reserve(m_chainWindowSize * 2);
uint64_t block_depth = 0;
const PoolBlock* cur = tip;
do {
wallets.push_back(&cur->m_minerWallet);
for (const hash& uncle_id : cur->m_uncles) {
auto it = m_blocksById.find(uncle_id);
if (it == m_blocksById.end()) {
return false;
}
// Skip uncles which are already out of PPLNS window
if (tip->m_sidechainHeight - it->second->m_sidechainHeight < m_chainWindowSize) {
wallets.push_back(&it->second->m_minerWallet);
}
}
++block_depth;
if ((block_depth >= m_chainWindowSize) || (cur->m_sidechainHeight == 0)) {
break;
}
auto it = m_blocksById.find(cur->m_parent);
if (it == m_blocksById.end()) {
return false;
}
cur = it->second;
} while (true);
// Remove duplicates
std::sort(wallets.begin(), wallets.end(), [](const Wallet* a, const Wallet* b) { return *a < *b; });
wallets.erase(std::unique(wallets.begin(), wallets.end(), [](const Wallet* a, const Wallet* b) { return *a == *b; }), wallets.end());
return true;
}
bool SideChain::block_seen(const PoolBlock& block)
{
// Check if it's some old block
@ -538,6 +608,9 @@ void SideChain::add_block(const PoolBlock& block)
m_blocksByHeight[new_block->m_sidechainHeight].push_back(new_block);
// Pre-calculate eph_public_keys during initial sync
launch_precalc(new_block);
update_depths(new_block);
if (new_block->m_verified) {
@ -1738,6 +1811,9 @@ void SideChain::prune_old_blocks()
if (p2pServer()) {
p2pServer()->clear_cached_blocks();
}
// Pre-calc workers are not needed anymore
finish_precalc();
}
}
@ -1855,4 +1931,104 @@ bool SideChain::check_config()
return true;
}
void SideChain::launch_precalc(const PoolBlock* block)
{
if (m_precalcFinished) {
return;
}
auto it = m_blocksByHeight.find(block->m_sidechainHeight + m_chainWindowSize - 1);
if ((it != m_blocksByHeight.end()) && !it->second.empty()) {
for (PoolBlock* b : it->second) {
std::vector<const Wallet*> wallets;
if (!b->m_precalculated && get_wallets(b, wallets)) {
b->m_precalculated = true;
PrecalcJob* job = new PrecalcJob{ b, std::move(wallets) };
{
MutexLock lock2(m_precalcJobsMutex);
m_precalcJobs.push_back(job);
}
uv_cond_signal(&m_precalcJobsCond);
}
}
}
}
void SideChain::precalc_worker()
{
do {
PrecalcJob* job;
{
MutexLock lock(m_precalcJobsMutex);
if (m_precalcFinished) {
return;
}
while (m_precalcJobs.empty()) {
uv_cond_wait(&m_precalcJobsCond, &m_precalcJobsMutex);
// cppcheck-suppress knownConditionTrueFalse
if (m_precalcFinished) {
return;
}
}
job = m_precalcJobs.back();
m_precalcJobs.pop_back();
// Filter out duplicate inputs for get_eph_public_key()
uint8_t t[HASH_SIZE * 2 + sizeof(size_t)];
memcpy(t, job->b->m_txkeySec.h, HASH_SIZE);
for (size_t i = 0, n = job->wallets.size(); i < n; ++i) {
memcpy(t + HASH_SIZE, job->wallets[i]->view_public_key().h, HASH_SIZE);
memcpy(t + HASH_SIZE * 2, &i, sizeof(i));
if (!m_uniquePrecalcInputs->insert(robin_hood::hash_bytes(t, array_size(t))).second) {
job->wallets[i] = nullptr;
}
}
}
for (size_t i = 0, n = job->wallets.size(); i < n; ++i) {
if (job->wallets[i]) {
hash eph_public_key;
uint8_t view_tag;
job->wallets[i]->get_eph_public_key(job->b->m_txkeySec, i, eph_public_key, view_tag);
}
}
delete job;
} while (true);
}
void SideChain::finish_precalc()
{
if (m_precalcFinished.exchange(true)) {
return;
}
{
MutexLock lock(m_precalcJobsMutex);
for (PrecalcJob* job : m_precalcJobs) {
delete job;
}
m_precalcJobs.clear();
m_precalcJobs.shrink_to_fit();
uv_cond_broadcast(&m_precalcJobsCond);
}
for (std::thread& t : m_precalcWorkers) {
t.join();
}
m_precalcWorkers.clear();
m_precalcWorkers.shrink_to_fit();
delete m_uniquePrecalcInputs;
uv_mutex_destroy(&m_precalcJobsMutex);
uv_cond_destroy(&m_precalcJobsCond);
LOGINFO(4, "pre-calculation workers stopped");
}
} // namespace p2pool

View file

@ -19,6 +19,7 @@
#include "uv_util.h"
#include <map>
#include <thread>
namespace p2pool {
@ -37,7 +38,7 @@ struct MinerShare
const Wallet* m_wallet;
};
class SideChain
class SideChain : public nocopy_nomove
{
public:
SideChain(p2pool* pool, NetworkType type, const char* pool_name = nullptr);
@ -86,6 +87,7 @@ private:
private:
bool get_shares(const PoolBlock* tip, std::vector<MinerShare>& shares) const;
bool get_difficulty(const PoolBlock* tip, std::vector<DifficultyData>& difficultyData, difficulty_type& curDifficulty) const;
bool get_wallets(const PoolBlock* tip, std::vector<const Wallet*>& wallets) const;
void verify_loop(PoolBlock* block);
void verify(PoolBlock* block);
void update_chain_tip(const PoolBlock* block);
@ -128,6 +130,25 @@ private:
ChainMain m_watchBlock;
hash m_watchBlockSidechainId;
struct PrecalcJob
{
const PoolBlock* b;
std::vector<const Wallet*> wallets;
};
uv_cond_t m_precalcJobsCond;
uv_mutex_t m_precalcJobsMutex;
std::vector<PrecalcJob*> m_precalcJobs;
std::vector<std::thread> m_precalcWorkers;
unordered_set<size_t>* m_uniquePrecalcInputs;
std::atomic<bool> m_precalcFinished;
void launch_precalc(const PoolBlock* block);
void precalc_worker();
void finish_precalc();
};
} // namespace p2pool

View file

@ -103,39 +103,39 @@ TEST(pool_block, deserialize)
TEST(pool_block, verify)
{
init_crypto_cache();
{
PoolBlock b;
SideChain sidechain(nullptr, NetworkType::Mainnet);
PoolBlock b;
SideChain sidechain(nullptr, NetworkType::Mainnet);
std::ifstream f("sidechain_dump.dat", std::ios::binary | std::ios::ate);
ASSERT_EQ(f.good() && f.is_open(), true);
std::ifstream f("sidechain_dump.dat", std::ios::binary | std::ios::ate);
ASSERT_EQ(f.good() && f.is_open(), true);
std::vector<uint8_t> buf(f.tellg());
f.seekg(0);
f.read(reinterpret_cast<char*>(buf.data()), buf.size());
ASSERT_EQ(f.good(), true);
std::vector<uint8_t> buf(f.tellg());
f.seekg(0);
f.read(reinterpret_cast<char*>(buf.data()), buf.size());
ASSERT_EQ(f.good(), true);
for (const uint8_t *p = buf.data(), *e = buf.data() + buf.size(); p < e;) {
ASSERT_TRUE(p + sizeof(uint32_t) <= e);
const uint32_t n = *reinterpret_cast<const uint32_t*>(p);
p += sizeof(uint32_t);
for (const uint8_t *p = buf.data(), *e = buf.data() + buf.size(); p < e;) {
ASSERT_TRUE(p + sizeof(uint32_t) <= e);
const uint32_t n = *reinterpret_cast<const uint32_t*>(p);
p += sizeof(uint32_t);
ASSERT_TRUE(p + n <= e);
ASSERT_EQ(b.deserialize(p, n, sidechain), 0);
p += n;
ASSERT_TRUE(p + n <= e);
ASSERT_EQ(b.deserialize(p, n, sidechain), 0);
p += n;
sidechain.add_block(b);
ASSERT_TRUE(sidechain.find_block(b.m_sidechainId) != nullptr);
}
sidechain.add_block(b);
ASSERT_TRUE(sidechain.find_block(b.m_sidechainId) != nullptr);
const PoolBlock* tip = sidechain.chainTip();
ASSERT_TRUE(tip != nullptr);
ASSERT_TRUE(tip->m_verified);
ASSERT_FALSE(tip->m_invalid);
ASSERT_EQ(tip->m_txinGenHeight, 2483901);
ASSERT_EQ(tip->m_sidechainHeight, 522805);
}
const PoolBlock* tip = sidechain.chainTip();
ASSERT_TRUE(tip != nullptr);
ASSERT_TRUE(tip->m_verified);
ASSERT_FALSE(tip->m_invalid);
ASSERT_EQ(tip->m_txinGenHeight, 2483901);
ASSERT_EQ(tip->m_sidechainHeight, 522805);
destroy_crypto_cache();
}