diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 946a951..966365a 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -52,11 +52,15 @@ P2PServer::P2PServer(p2pool* pool) , m_cachedBlocks(nullptr) , m_rng(RandomDeviceSeed::instance) , m_block(new PoolBlock()) + , m_blockDeserializeResult(0) , m_timer{} , m_timerCounter(0) , m_timerInterval(2) , m_peerListLastSaved(0) + , m_lookForMissingBlocks(true) { + m_blockDeserializeBuf.reserve(131072); + // Diffuse the initial state in case it has low quality m_rng.discard(10000); @@ -211,16 +215,15 @@ void P2PServer::update_peer_connections() unordered_set connected_clients; { MutexLock lock(m_clientsListLock); + connected_clients.reserve(m_numConnections); for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { - bool disconnected = false; - const int timeout = client->m_handshakeComplete ? 300 : 10; if (cur_time >= client->m_lastAlive + timeout) { const uint64_t idle_time = static_cast(cur_time - client->m_lastAlive); LOGWARN(5, "peer " << static_cast(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting"); client->close(); - disconnected = true; + continue; } if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) { @@ -234,15 +237,13 @@ void P2PServer::update_peer_connections() client->ban(DEFAULT_BAN_TIME); remove_peer_from_list(client); client->close(); - disconnected = true; + continue; } } - if (!disconnected) { - connected_clients.insert(client->m_addr); - if (client->m_handshakeComplete && !client->m_handshakeInvalid && (client->m_listenPort >= 0)) { - has_good_peers = true; - } + connected_clients.insert(client->m_addr); + if (client->is_good()) { + has_good_peers = true; } } } @@ -305,35 +306,30 @@ void P2PServer::update_peer_connections() void P2PServer::update_peer_list() { const uint64_t cur_time = seconds_since_epoch(); - { - MutexLock lock(m_clientsListLock); - for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { - if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) { - continue; - } + MutexLock lock(m_clientsListLock); - if (cur_time >= client->m_nextOutgoingPeerListRequest) { - // Send peer list requests at random intervals (60-120 seconds) - client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61)); + for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { + if (client->is_good() && (cur_time >= client->m_nextOutgoingPeerListRequest)) { + // Send peer list requests at random intervals (60-120 seconds) + client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61)); - const bool result = send(client, - [](void* buf, size_t buf_size) - { - LOGINFO(5, "sending PEER_LIST_REQUEST"); + const bool result = send(client, + [](void* buf, size_t buf_size) + { + LOGINFO(5, "sending PEER_LIST_REQUEST"); - if (buf_size < SEND_BUF_MIN_SIZE) { - return 0; - } + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } - *reinterpret_cast(buf) = static_cast(MessageId::PEER_LIST_REQUEST); - return 1; - }); + *reinterpret_cast(buf) = static_cast(MessageId::PEER_LIST_REQUEST); + return 1; + }); - if (result) { - client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now(); - ++client->m_peerListPendingRequests; - } + if (result) { + client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now(); + ++client->m_peerListPendingRequests; } } } @@ -895,6 +891,24 @@ void P2PServer::show_peers() LOGINFO(0, "Total: " << n << " peers"); } +int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size) +{ + int result; + + if ((m_blockDeserializeBuf.size() == size) && (memcmp(m_blockDeserializeBuf.data(), buf, size) == 0)) { + m_block->reset_offchain_data(); + result = m_blockDeserializeResult; + } + else { + result = m_block->deserialize(buf, size, m_pool->side_chain()); + m_blockDeserializeBuf.assign(buf, buf + size); + m_blockDeserializeResult = result; + m_lookForMissingBlocks = true; + } + + return result; +} + void P2PServer::on_timer() { ++m_timerCounter; @@ -948,10 +962,16 @@ void P2PServer::flush_cache() void P2PServer::download_missing_blocks() { + if (!m_lookForMissingBlocks) { + return; + } + std::vector missing_blocks; m_pool->side_chain().get_missing_blocks(missing_blocks); if (missing_blocks.empty()) { + m_lookForMissingBlocks = false; + MutexLock lock(m_missingBlockRequestsLock); m_missingBlockRequests.clear(); return; @@ -967,11 +987,9 @@ void P2PServer::download_missing_blocks() clients.reserve(m_numConnections); for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { - if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) { - continue; + if (client->is_good()) { + clients.emplace_back(client); } - - clients.emplace_back(client); } if (clients.empty()) { @@ -1032,7 +1050,7 @@ void P2PServer::download_missing_blocks() void P2PServer::check_zmq() { - if ((m_timerCounter % 30) != 0) { + if ((m_timerCounter % 30) != 3) { return; } @@ -1764,7 +1782,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) MutexLock lock(server->m_blockLock); - const int result = server->m_block->deserialize(buf, size, server->m_pool->side_chain()); + const int result = server->deserialize_block(buf, size); if (result != 0) { LOGWARN(3, "peer " << static_cast(m_addrString) << " sent an invalid block, error " << result); return false; @@ -1773,7 +1791,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) if (m_chainTipBlockRequest) { m_chainTipBlockRequest = false; - const uint64_t peer_height = server->m_block->m_txinGenHeight; + const uint64_t peer_height = server->get_block()->m_txinGenHeight; const uint64_t our_height = server->m_pool->miner_data().height; if (peer_height + 2 < our_height) { @@ -1782,7 +1800,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) } } - return handle_incoming_block_async(server->m_block); + return handle_incoming_block_async(server->get_block()); } bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) @@ -1796,19 +1814,21 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) MutexLock lock(server->m_blockLock); - const int result = server->m_block->deserialize(buf, size, server->m_pool->side_chain()); + const int result = server->deserialize_block(buf, size); if (result != 0) { LOGWARN(3, "peer " << static_cast(m_addrString) << " sent an invalid block, error " << result); return false; } - m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = server->m_block->m_sidechainId; + const PoolBlock* block = server->get_block(); + + m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = block->m_sidechainId; MinerData miner_data = server->m_pool->miner_data(); - if (server->m_block->m_prevId != miner_data.prev_id) { + if (block->m_prevId != miner_data.prev_id) { // This peer is mining on top of a different Monero block, investigate it - const uint64_t peer_height = server->m_block->m_txinGenHeight; + const uint64_t peer_height = block->m_txinGenHeight; const uint64_t our_height = miner_data.height; if (peer_height < our_height) { @@ -1838,11 +1858,11 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) } } - server->m_block->m_wantBroadcast = true; + block->m_wantBroadcast = true; m_lastBroadcastTimestamp = seconds_since_epoch(); - return handle_incoming_block_async(server->m_block); + return handle_incoming_block_async(block); } bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) @@ -1967,7 +1987,7 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const return true; } -bool P2PServer::P2PClient::handle_incoming_block_async(PoolBlock* block) +bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block) { P2PServer* server = static_cast(m_owner); diff --git a/src/p2p_server.h b/src/p2p_server.h index 9e70054..0119834 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -97,10 +97,12 @@ public: bool on_peer_list_request(const uint8_t* buf); bool on_peer_list_response(const uint8_t* buf) const; - bool handle_incoming_block_async(PoolBlock* block); + bool handle_incoming_block_async(const PoolBlock* block); void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, const raw_ip& addr, std::vector& missing_blocks); void post_handle_incoming_block(const uint32_t reset_counter, std::vector& missing_blocks); + bool is_good() const { return m_handshakeComplete && !m_handshakeInvalid && (m_listenPort >= 0); } + uint64_t m_peerId; MessageId m_expectedMessage; uint64_t m_handshakeChallenge; @@ -141,6 +143,9 @@ public: void set_max_outgoing_peers(uint32_t n) { m_maxOutgoingPeers = std::min(std::max(n, 10U), 1000U); } void set_max_incoming_peers(uint32_t n) { m_maxIncomingPeers = std::min(std::max(n, 10U), 1000U); } + int deserialize_block(const uint8_t* buf, uint32_t size); + const PoolBlock* get_block() const { return m_block; } + private: p2pool* m_pool; BlockCache* m_cache; @@ -174,6 +179,8 @@ private: uv_mutex_t m_blockLock; PoolBlock* m_block; + std::vector m_blockDeserializeBuf; + int m_blockDeserializeResult; uv_timer_t m_timer; uint64_t m_timerCounter; @@ -207,6 +214,8 @@ private: uv_async_t m_broadcastAsync; std::vector m_broadcastQueue; + bool m_lookForMissingBlocks; + uv_mutex_t m_missingBlockRequestsLock; unordered_set> m_missingBlockRequests; diff --git a/src/pool_block.cpp b/src/pool_block.cpp index 9c754de..9db3091 100644 --- a/src/pool_block.cpp +++ b/src/pool_block.cpp @@ -229,6 +229,22 @@ void PoolBlock::serialize_sidechain_data() writeVarint(m_cumulativeDifficulty.hi, m_sideChainData); } +void PoolBlock::reset_offchain_data() +{ + // Defaults for off-chain variables + m_tmpTxExtra.clear(); + + m_depth = 0; + + m_verified = false; + m_invalid = false; + + m_broadcasted = false; + m_wantBroadcast = false; + + m_localTimestamp = seconds_since_epoch(); +} + bool PoolBlock::get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash) { alignas(8) uint8_t hashes[HASH_SIZE * 3]; diff --git a/src/pool_block.h b/src/pool_block.h index e9afc64..8a0c3b2 100644 --- a/src/pool_block.h +++ b/src/pool_block.h @@ -129,15 +129,17 @@ struct PoolBlock bool m_verified; bool m_invalid; - bool m_broadcasted; - bool m_wantBroadcast; + mutable bool m_broadcasted; + mutable bool m_wantBroadcast; uint64_t m_localTimestamp; void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash); void serialize_sidechain_data(); - int deserialize(const uint8_t* data, size_t size, SideChain& sidechain); + int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain); + void reset_offchain_data(); + bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash); uint64_t get_payout(const Wallet& w) const; diff --git a/src/pool_block_parser.inl b/src/pool_block_parser.inl index dae6aa8..f34d949 100644 --- a/src/pool_block_parser.inl +++ b/src/pool_block_parser.inl @@ -23,7 +23,7 @@ namespace p2pool { // Since data here can come from external and possibly malicious sources, check everything // Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here // Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on) -int PoolBlock::deserialize(const uint8_t* data, size_t size, SideChain& sidechain) +int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain) { try { // Sanity check @@ -333,19 +333,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, SideChain& sidechai return __LINE__; } - // Defaults for off-chain variables - m_tmpTxExtra.clear(); - - m_depth = 0; - - m_verified = false; - m_invalid = false; - - m_broadcasted = false; - m_wantBroadcast = false; - - m_localTimestamp = seconds_since_epoch(); - + reset_offchain_data(); return 0; } diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 9fff994..7093e1b 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -38,13 +38,6 @@ #include #include -// Only uncomment it to debug issues with uncle/orphan blocks -//#define DEBUG_BROADCAST_DELAY_MS 100 - -#ifdef DEBUG_BROADCAST_DELAY_MS -#include -#endif - static constexpr char log_category_prefix[] = "SideChain "; static constexpr uint64_t MIN_DIFFICULTY = 100000; @@ -289,7 +282,7 @@ P2PServer* SideChain::p2pServer() const return m_pool ? m_pool->p2p_server() : nullptr; } -bool SideChain::get_shares(PoolBlock* tip, std::vector& shares) const +bool SideChain::get_shares(const PoolBlock* tip, std::vector& shares) const { shares.clear(); shares.reserve(m_chainWindowSize * 2); @@ -297,7 +290,7 @@ bool SideChain::get_shares(PoolBlock* tip, std::vector& shares) cons // Collect shares from each block in the PPLNS window, starting from the "tip" uint64_t block_depth = 0; - PoolBlock* cur = tip; + const PoolBlock* cur = tip; do { MinerShare cur_share{ cur->m_difficulty.lo, &cur->m_minerWallet }; @@ -946,11 +939,11 @@ bool SideChain::split_reward(uint64_t reward, const std::vector& sha return true; } -bool SideChain::get_difficulty(PoolBlock* tip, std::vector& difficultyData, difficulty_type& curDifficulty) const +bool SideChain::get_difficulty(const PoolBlock* tip, std::vector& difficultyData, difficulty_type& curDifficulty) const { difficultyData.clear(); - PoolBlock* cur = tip; + const PoolBlock* cur = tip; uint64_t oldest_timestamp = std::numeric_limits::max(); uint64_t block_depth = 0; @@ -1442,7 +1435,7 @@ void SideChain::verify(PoolBlock* block) block->m_invalid = false; } -void SideChain::update_chain_tip(PoolBlock* block) +void SideChain::update_chain_tip(const PoolBlock* block) { if (!block->m_verified || block->m_invalid) { LOGERR(1, "trying to update chain tip to an unverified or invalid block, fix the code!"); @@ -1460,7 +1453,7 @@ void SideChain::update_chain_tip(PoolBlock* block) if (is_longer_chain(tip, block, is_alternative)) { difficulty_type diff; if (get_difficulty(block, m_difficultyData, diff)) { - m_chainTip = block; + m_chainTip = const_cast(block); { WriteLock lock(m_curDifficultyLock); m_curDifficulty = diff; @@ -1503,36 +1496,7 @@ void SideChain::update_chain_tip(PoolBlock* block) if (p2pServer() && block->m_wantBroadcast && !block->m_broadcasted) { block->m_broadcasted = true; -#ifdef DEBUG_BROADCAST_DELAY_MS - struct Work - { - uv_work_t req; - P2PServer* server; - PoolBlock* block; - }; - Work* work = new Work{}; - work->req.data = work; - work->server = p2pServer(); - work->block = block; - const int err = uv_queue_work(uv_default_loop(), &work->req, - [](uv_work_t*) - { - num_running_jobs.fetch_add(1); - std::this_thread::sleep_for(std::chrono::milliseconds(DEBUG_BROADCAST_DELAY_MS)); - }, - [](uv_work_t* req, int) - { - Work* work = reinterpret_cast(req->data); - work->server->broadcast(*work->block); - delete reinterpret_cast(req->data); - num_running_jobs.fetch_sub(1); - }); - if (err) { - LOGERR(1, "update_chain_tip: uv_queue_work failed, error " << uv_err_name(err)); - } -#else p2pServer()->broadcast(*block); -#endif } } diff --git a/src/side_chain.h b/src/side_chain.h index 60bc6b0..57fea94 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -31,10 +31,10 @@ class Wallet; struct MinerShare { FORCEINLINE MinerShare() : m_weight(0), m_wallet(nullptr) {} - FORCEINLINE MinerShare(uint64_t w, Wallet* x) : m_weight(w), m_wallet(x) {} + FORCEINLINE MinerShare(uint64_t w, const Wallet* x) : m_weight(w), m_wallet(x) {} uint64_t m_weight; - Wallet* m_wallet; + const Wallet* m_wallet; }; class SideChain @@ -84,11 +84,11 @@ private: NetworkType m_networkType; private: - bool get_shares(PoolBlock* tip, std::vector& shares) const; - bool get_difficulty(PoolBlock* tip, std::vector& difficultyData, difficulty_type& curDifficulty) const; + bool get_shares(const PoolBlock* tip, std::vector& shares) const; + bool get_difficulty(const PoolBlock* tip, std::vector& difficultyData, difficulty_type& curDifficulty) const; void verify_loop(PoolBlock* block); void verify(PoolBlock* block); - void update_chain_tip(PoolBlock* block); + void update_chain_tip(const PoolBlock* block); PoolBlock* get_parent(const PoolBlock* block) const; // Checks if "candidate" has longer (higher difficulty) chain than "block"