P2PServer: don't deserialize the same block twice

This commit is contained in:
SChernykh 2022-07-10 10:24:03 +02:00
parent ae161fac49
commit c5bd184bbc
7 changed files with 111 additions and 112 deletions

View file

@ -52,11 +52,15 @@ P2PServer::P2PServer(p2pool* pool)
, m_cachedBlocks(nullptr)
, m_rng(RandomDeviceSeed::instance)
, m_block(new PoolBlock())
, m_blockDeserializeResult(0)
, m_timer{}
, m_timerCounter(0)
, m_timerInterval(2)
, m_peerListLastSaved(0)
, m_lookForMissingBlocks(true)
{
m_blockDeserializeBuf.reserve(131072);
// Diffuse the initial state in case it has low quality
m_rng.discard(10000);
@ -211,16 +215,15 @@ void P2PServer::update_peer_connections()
unordered_set<raw_ip> connected_clients;
{
MutexLock lock(m_clientsListLock);
connected_clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
bool disconnected = false;
const int timeout = client->m_handshakeComplete ? 300 : 10;
if (cur_time >= client->m_lastAlive + timeout) {
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
client->close();
disconnected = true;
continue;
}
if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) {
@ -234,15 +237,13 @@ void P2PServer::update_peer_connections()
client->ban(DEFAULT_BAN_TIME);
remove_peer_from_list(client);
client->close();
disconnected = true;
continue;
}
}
if (!disconnected) {
connected_clients.insert(client->m_addr);
if (client->m_handshakeComplete && !client->m_handshakeInvalid && (client->m_listenPort >= 0)) {
has_good_peers = true;
}
connected_clients.insert(client->m_addr);
if (client->is_good()) {
has_good_peers = true;
}
}
}
@ -305,35 +306,30 @@ void P2PServer::update_peer_connections()
void P2PServer::update_peer_list()
{
const uint64_t cur_time = seconds_since_epoch();
{
MutexLock lock(m_clientsListLock);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) {
continue;
}
MutexLock lock(m_clientsListLock);
if (cur_time >= client->m_nextOutgoingPeerListRequest) {
// Send peer list requests at random intervals (60-120 seconds)
client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61));
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (client->is_good() && (cur_time >= client->m_nextOutgoingPeerListRequest)) {
// Send peer list requests at random intervals (60-120 seconds)
client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61));
const bool result = send(client,
[](void* buf, size_t buf_size)
{
LOGINFO(5, "sending PEER_LIST_REQUEST");
const bool result = send(client,
[](void* buf, size_t buf_size)
{
LOGINFO(5, "sending PEER_LIST_REQUEST");
if (buf_size < SEND_BUF_MIN_SIZE) {
return 0;
}
if (buf_size < SEND_BUF_MIN_SIZE) {
return 0;
}
*reinterpret_cast<uint8_t*>(buf) = static_cast<uint8_t>(MessageId::PEER_LIST_REQUEST);
return 1;
});
*reinterpret_cast<uint8_t*>(buf) = static_cast<uint8_t>(MessageId::PEER_LIST_REQUEST);
return 1;
});
if (result) {
client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now();
++client->m_peerListPendingRequests;
}
if (result) {
client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now();
++client->m_peerListPendingRequests;
}
}
}
@ -895,6 +891,24 @@ void P2PServer::show_peers()
LOGINFO(0, "Total: " << n << " peers");
}
int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size)
{
int result;
if ((m_blockDeserializeBuf.size() == size) && (memcmp(m_blockDeserializeBuf.data(), buf, size) == 0)) {
m_block->reset_offchain_data();
result = m_blockDeserializeResult;
}
else {
result = m_block->deserialize(buf, size, m_pool->side_chain());
m_blockDeserializeBuf.assign(buf, buf + size);
m_blockDeserializeResult = result;
m_lookForMissingBlocks = true;
}
return result;
}
void P2PServer::on_timer()
{
++m_timerCounter;
@ -948,10 +962,16 @@ void P2PServer::flush_cache()
void P2PServer::download_missing_blocks()
{
if (!m_lookForMissingBlocks) {
return;
}
std::vector<hash> missing_blocks;
m_pool->side_chain().get_missing_blocks(missing_blocks);
if (missing_blocks.empty()) {
m_lookForMissingBlocks = false;
MutexLock lock(m_missingBlockRequestsLock);
m_missingBlockRequests.clear();
return;
@ -967,11 +987,9 @@ void P2PServer::download_missing_blocks()
clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) {
continue;
if (client->is_good()) {
clients.emplace_back(client);
}
clients.emplace_back(client);
}
if (clients.empty()) {
@ -1032,7 +1050,7 @@ void P2PServer::download_missing_blocks()
void P2PServer::check_zmq()
{
if ((m_timerCounter % 30) != 0) {
if ((m_timerCounter % 30) != 3) {
return;
}
@ -1764,7 +1782,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size)
MutexLock lock(server->m_blockLock);
const int result = server->m_block->deserialize(buf, size, server->m_pool->side_chain());
const int result = server->deserialize_block(buf, size);
if (result != 0) {
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " sent an invalid block, error " << result);
return false;
@ -1773,7 +1791,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size)
if (m_chainTipBlockRequest) {
m_chainTipBlockRequest = false;
const uint64_t peer_height = server->m_block->m_txinGenHeight;
const uint64_t peer_height = server->get_block()->m_txinGenHeight;
const uint64_t our_height = server->m_pool->miner_data().height;
if (peer_height + 2 < our_height) {
@ -1782,7 +1800,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size)
}
}
return handle_incoming_block_async(server->m_block);
return handle_incoming_block_async(server->get_block());
}
bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
@ -1796,19 +1814,21 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
MutexLock lock(server->m_blockLock);
const int result = server->m_block->deserialize(buf, size, server->m_pool->side_chain());
const int result = server->deserialize_block(buf, size);
if (result != 0) {
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " sent an invalid block, error " << result);
return false;
}
m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = server->m_block->m_sidechainId;
const PoolBlock* block = server->get_block();
m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = block->m_sidechainId;
MinerData miner_data = server->m_pool->miner_data();
if (server->m_block->m_prevId != miner_data.prev_id) {
if (block->m_prevId != miner_data.prev_id) {
// This peer is mining on top of a different Monero block, investigate it
const uint64_t peer_height = server->m_block->m_txinGenHeight;
const uint64_t peer_height = block->m_txinGenHeight;
const uint64_t our_height = miner_data.height;
if (peer_height < our_height) {
@ -1838,11 +1858,11 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
}
}
server->m_block->m_wantBroadcast = true;
block->m_wantBroadcast = true;
m_lastBroadcastTimestamp = seconds_since_epoch();
return handle_incoming_block_async(server->m_block);
return handle_incoming_block_async(block);
}
bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
@ -1967,7 +1987,7 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const
return true;
}
bool P2PServer::P2PClient::handle_incoming_block_async(PoolBlock* block)
bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block)
{
P2PServer* server = static_cast<P2PServer*>(m_owner);

View file

@ -97,10 +97,12 @@ public:
bool on_peer_list_request(const uint8_t* buf);
bool on_peer_list_response(const uint8_t* buf) const;
bool handle_incoming_block_async(PoolBlock* block);
bool handle_incoming_block_async(const PoolBlock* block);
void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, const raw_ip& addr, std::vector<hash>& missing_blocks);
void post_handle_incoming_block(const uint32_t reset_counter, std::vector<hash>& missing_blocks);
bool is_good() const { return m_handshakeComplete && !m_handshakeInvalid && (m_listenPort >= 0); }
uint64_t m_peerId;
MessageId m_expectedMessage;
uint64_t m_handshakeChallenge;
@ -141,6 +143,9 @@ public:
void set_max_outgoing_peers(uint32_t n) { m_maxOutgoingPeers = std::min(std::max(n, 10U), 1000U); }
void set_max_incoming_peers(uint32_t n) { m_maxIncomingPeers = std::min(std::max(n, 10U), 1000U); }
int deserialize_block(const uint8_t* buf, uint32_t size);
const PoolBlock* get_block() const { return m_block; }
private:
p2pool* m_pool;
BlockCache* m_cache;
@ -174,6 +179,8 @@ private:
uv_mutex_t m_blockLock;
PoolBlock* m_block;
std::vector<uint8_t> m_blockDeserializeBuf;
int m_blockDeserializeResult;
uv_timer_t m_timer;
uint64_t m_timerCounter;
@ -207,6 +214,8 @@ private:
uv_async_t m_broadcastAsync;
std::vector<Broadcast*> m_broadcastQueue;
bool m_lookForMissingBlocks;
uv_mutex_t m_missingBlockRequestsLock;
unordered_set<std::pair<uint64_t, uint64_t>> m_missingBlockRequests;

View file

@ -229,6 +229,22 @@ void PoolBlock::serialize_sidechain_data()
writeVarint(m_cumulativeDifficulty.hi, m_sideChainData);
}
void PoolBlock::reset_offchain_data()
{
// Defaults for off-chain variables
m_tmpTxExtra.clear();
m_depth = 0;
m_verified = false;
m_invalid = false;
m_broadcasted = false;
m_wantBroadcast = false;
m_localTimestamp = seconds_since_epoch();
}
bool PoolBlock::get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash)
{
alignas(8) uint8_t hashes[HASH_SIZE * 3];

View file

@ -129,15 +129,17 @@ struct PoolBlock
bool m_verified;
bool m_invalid;
bool m_broadcasted;
bool m_wantBroadcast;
mutable bool m_broadcasted;
mutable bool m_wantBroadcast;
uint64_t m_localTimestamp;
void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash);
void serialize_sidechain_data();
int deserialize(const uint8_t* data, size_t size, SideChain& sidechain);
int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain);
void reset_offchain_data();
bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash);
uint64_t get_payout(const Wallet& w) const;

View file

@ -23,7 +23,7 @@ namespace p2pool {
// Since data here can come from external and possibly malicious sources, check everything
// Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here
// Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on)
int PoolBlock::deserialize(const uint8_t* data, size_t size, SideChain& sidechain)
int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain)
{
try {
// Sanity check
@ -333,19 +333,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, SideChain& sidechai
return __LINE__;
}
// Defaults for off-chain variables
m_tmpTxExtra.clear();
m_depth = 0;
m_verified = false;
m_invalid = false;
m_broadcasted = false;
m_wantBroadcast = false;
m_localTimestamp = seconds_since_epoch();
reset_offchain_data();
return 0;
}

View file

@ -38,13 +38,6 @@
#include <iterator>
#include <numeric>
// Only uncomment it to debug issues with uncle/orphan blocks
//#define DEBUG_BROADCAST_DELAY_MS 100
#ifdef DEBUG_BROADCAST_DELAY_MS
#include <thread>
#endif
static constexpr char log_category_prefix[] = "SideChain ";
static constexpr uint64_t MIN_DIFFICULTY = 100000;
@ -289,7 +282,7 @@ P2PServer* SideChain::p2pServer() const
return m_pool ? m_pool->p2p_server() : nullptr;
}
bool SideChain::get_shares(PoolBlock* tip, std::vector<MinerShare>& shares) const
bool SideChain::get_shares(const PoolBlock* tip, std::vector<MinerShare>& shares) const
{
shares.clear();
shares.reserve(m_chainWindowSize * 2);
@ -297,7 +290,7 @@ bool SideChain::get_shares(PoolBlock* tip, std::vector<MinerShare>& shares) cons
// Collect shares from each block in the PPLNS window, starting from the "tip"
uint64_t block_depth = 0;
PoolBlock* cur = tip;
const PoolBlock* cur = tip;
do {
MinerShare cur_share{ cur->m_difficulty.lo, &cur->m_minerWallet };
@ -946,11 +939,11 @@ bool SideChain::split_reward(uint64_t reward, const std::vector<MinerShare>& sha
return true;
}
bool SideChain::get_difficulty(PoolBlock* tip, std::vector<DifficultyData>& difficultyData, difficulty_type& curDifficulty) const
bool SideChain::get_difficulty(const PoolBlock* tip, std::vector<DifficultyData>& difficultyData, difficulty_type& curDifficulty) const
{
difficultyData.clear();
PoolBlock* cur = tip;
const PoolBlock* cur = tip;
uint64_t oldest_timestamp = std::numeric_limits<uint64_t>::max();
uint64_t block_depth = 0;
@ -1442,7 +1435,7 @@ void SideChain::verify(PoolBlock* block)
block->m_invalid = false;
}
void SideChain::update_chain_tip(PoolBlock* block)
void SideChain::update_chain_tip(const PoolBlock* block)
{
if (!block->m_verified || block->m_invalid) {
LOGERR(1, "trying to update chain tip to an unverified or invalid block, fix the code!");
@ -1460,7 +1453,7 @@ void SideChain::update_chain_tip(PoolBlock* block)
if (is_longer_chain(tip, block, is_alternative)) {
difficulty_type diff;
if (get_difficulty(block, m_difficultyData, diff)) {
m_chainTip = block;
m_chainTip = const_cast<PoolBlock*>(block);
{
WriteLock lock(m_curDifficultyLock);
m_curDifficulty = diff;
@ -1503,36 +1496,7 @@ void SideChain::update_chain_tip(PoolBlock* block)
if (p2pServer() && block->m_wantBroadcast && !block->m_broadcasted) {
block->m_broadcasted = true;
#ifdef DEBUG_BROADCAST_DELAY_MS
struct Work
{
uv_work_t req;
P2PServer* server;
PoolBlock* block;
};
Work* work = new Work{};
work->req.data = work;
work->server = p2pServer();
work->block = block;
const int err = uv_queue_work(uv_default_loop(), &work->req,
[](uv_work_t*)
{
num_running_jobs.fetch_add(1);
std::this_thread::sleep_for(std::chrono::milliseconds(DEBUG_BROADCAST_DELAY_MS));
},
[](uv_work_t* req, int)
{
Work* work = reinterpret_cast<Work*>(req->data);
work->server->broadcast(*work->block);
delete reinterpret_cast<Work*>(req->data);
num_running_jobs.fetch_sub(1);
});
if (err) {
LOGERR(1, "update_chain_tip: uv_queue_work failed, error " << uv_err_name(err));
}
#else
p2pServer()->broadcast(*block);
#endif
}
}

View file

@ -31,10 +31,10 @@ class Wallet;
struct MinerShare
{
FORCEINLINE MinerShare() : m_weight(0), m_wallet(nullptr) {}
FORCEINLINE MinerShare(uint64_t w, Wallet* x) : m_weight(w), m_wallet(x) {}
FORCEINLINE MinerShare(uint64_t w, const Wallet* x) : m_weight(w), m_wallet(x) {}
uint64_t m_weight;
Wallet* m_wallet;
const Wallet* m_wallet;
};
class SideChain
@ -84,11 +84,11 @@ private:
NetworkType m_networkType;
private:
bool get_shares(PoolBlock* tip, std::vector<MinerShare>& shares) const;
bool get_difficulty(PoolBlock* tip, std::vector<DifficultyData>& difficultyData, difficulty_type& curDifficulty) const;
bool get_shares(const PoolBlock* tip, std::vector<MinerShare>& shares) const;
bool get_difficulty(const PoolBlock* tip, std::vector<DifficultyData>& difficultyData, difficulty_type& curDifficulty) const;
void verify_loop(PoolBlock* block);
void verify(PoolBlock* block);
void update_chain_tip(PoolBlock* block);
void update_chain_tip(const PoolBlock* block);
PoolBlock* get_parent(const PoolBlock* block) const;
// Checks if "candidate" has longer (higher difficulty) chain than "block"