From d8ecc1174d41913517f238d643e9b3518794bfde Mon Sep 17 00:00:00 2001 From: SChernykh Date: Fri, 7 Jul 2023 11:00:19 +0200 Subject: [PATCH] P2PServer: refactored BLOCK_NOTIFY logic - Limit how many block requests can be in flight - Don't send requests for the same block twice --- src/common.h | 13 +++++++----- src/log.h | 3 +++ src/p2p_server.cpp | 46 +++++++++++++++++++++++++++--------------- src/p2p_server.h | 7 ++++--- src/side_chain.cpp | 2 +- src/stratum_server.cpp | 6 +++--- src/util.cpp | 2 +- 7 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/common.h b/src/common.h index a6e673c..e8463b6 100644 --- a/src/common.h +++ b/src/common.h @@ -152,8 +152,8 @@ struct alignas(uint64_t) hash FORCEINLINE bool operator<(const hash& other) const { - const uint64_t* a = reinterpret_cast(h); - const uint64_t* b = reinterpret_cast(other.h); + const uint64_t* a = u64(); + const uint64_t* b = other.u64(); if (a[3] < b[3]) return true; if (a[3] > b[3]) return false; @@ -169,18 +169,21 @@ struct alignas(uint64_t) hash FORCEINLINE bool operator==(const hash& other) const { - const uint64_t* a = reinterpret_cast(h); - const uint64_t* b = reinterpret_cast(other.h); + const uint64_t* a = u64(); + const uint64_t* b = other.u64(); return (a[0] == b[0]) && (a[1] == b[1]) && (a[2] == b[2]) && (a[3] == b[3]); } FORCEINLINE bool operator!=(const hash& other) const { return !operator==(other); } FORCEINLINE bool empty() const { - const uint64_t* a = reinterpret_cast(h); + const uint64_t* a = u64(); return (a[0] == 0) && (a[1] == 0) && (a[2] == 0) && (a[3] == 0); } + FORCEINLINE uint64_t* u64() { return reinterpret_cast(h); } + FORCEINLINE const uint64_t* u64() const { return reinterpret_cast(h); } + friend std::ostream& operator<<(std::ostream& s, const hash& d); friend std::istream& operator>>(std::istream& s, hash& d); }; diff --git a/src/log.h b/src/log.h index d5e8fec..1f8092c 100644 --- a/src/log.h +++ b/src/log.h @@ -289,6 +289,9 @@ struct hex_buf { FORCEINLINE hex_buf(const uint8_t* data, size_t size) : m_data(data), m_size(size) {} + template + explicit FORCEINLINE hex_buf(const T* data) : m_data(reinterpret_cast(data)), m_size(sizeof(T)) {} + const uint8_t* m_data; size_t m_size; }; diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 4b69b11..b4dd1d9 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -1120,6 +1120,7 @@ void P2PServer::download_missing_blocks() if (missing_blocks.empty()) { m_lookForMissingBlocks = false; m_missingBlockRequests.clear(); + m_blockNotifyRequests.clear(); return; } @@ -1146,8 +1147,12 @@ void P2PServer::download_missing_blocks() for (const hash& id : missing_blocks) { P2PClient* client = clients[get_random64() % clients.size()]; - const uint64_t truncated_block_id = *reinterpret_cast(id.h); - if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) { + if (client->m_blockPendingRequests.size() >= 25) { + // Too many pending requests to this peer + continue; + } + + if (!m_missingBlockRequests.insert({ client->m_peerId, *id.u64() }).second) { // We already asked this peer about this block // Don't try to ask another peer, leave it for another timer tick continue; @@ -1182,7 +1187,7 @@ void P2PServer::download_missing_blocks() }); if (result) { - client->m_blockPendingRequests.push_back(id); + client->m_blockPendingRequests.push_back(*id.u64()); } } } @@ -1540,7 +1545,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size) if (bytes_left >= 1 + sizeof(uint32_t) + block_size) { bytes_read = 1 + sizeof(uint32_t) + block_size; - const hash expected_id = m_blockPendingRequests.front(); + const uint64_t expected_id = m_blockPendingRequests.front(); m_blockPendingRequests.pop_front(); if (!on_block_response(buf + 1 + sizeof(uint32_t), block_size, expected_id)) { @@ -1770,7 +1775,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH return; } - const uint64_t* value = reinterpret_cast(work->solution.h); + const uint64_t* value = work->solution.u64(); uint64_t high; umul128(value[HASH_SIZE / sizeof(uint64_t) - 1], CHALLENGE_DIFFICULTY, &high); @@ -1916,7 +1921,7 @@ bool P2PServer::P2PClient::on_handshake_solution(const uint8_t* buf) // Check that incoming connection provided enough PoW if (m_isIncoming) { - const uint64_t* value = reinterpret_cast(solution.h); + const uint64_t* value = solution.u64(); uint64_t high; umul128(value[HASH_SIZE / sizeof(uint64_t) - 1], CHALLENGE_DIFFICULTY, &high); @@ -1977,7 +1982,7 @@ void P2PServer::P2PClient::on_after_handshake(uint8_t* &p) memcpy(p, zero_hash.h, HASH_SIZE); p += HASH_SIZE; - m_blockPendingRequests.push_back(zero_hash); + m_blockPendingRequests.push_back(0); m_lastBroadcastTimestamp = seconds_since_epoch(); } @@ -2038,14 +2043,14 @@ bool P2PServer::P2PClient::on_block_request(const uint8_t* buf) }); } -bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, const hash& expected_id) +bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, uint64_t expected_id) { P2PServer* server = static_cast(m_owner); const uint64_t cur_time = seconds_since_epoch(); if (!size) { LOGINFO(5, "peer " << log::Gray() << static_cast(m_addrString) << log::NoColor() << " sent an empty block response"); - if (expected_id.empty() && (cur_time >= m_nextOutgoingPeerListRequest)) { + if ((expected_id == 0) && (cur_time >= m_nextOutgoingPeerListRequest)) { server->send_peer_list_request(this, cur_time); } return true; @@ -2064,7 +2069,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, const PoolBlock* block = server->get_block(); // Chain tip request - if (expected_id.empty()) { + if (expected_id == 0) { const uint64_t peer_height = block->m_txinGenHeight; const uint64_t our_height = server->m_pool->miner_data().height; @@ -2077,8 +2082,8 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, server->send_peer_list_request(this, cur_time); } } - else if (block->m_sidechainId != expected_id) { - LOGWARN(3, "peer " << static_cast(m_addrString) << " sent a wrong block: expected " << expected_id << ", got " << block->m_sidechainId); + else if (*block->m_sidechainId.u64() != expected_id) { + LOGWARN(3, "peer " << static_cast(m_addrString) << " sent a wrong block: expected " << log::hex_buf(&expected_id) << ", got " << block->m_sidechainId); return false; } @@ -2320,6 +2325,16 @@ void P2PServer::P2PClient::on_block_notify(const uint8_t* buf) if (!server->find_block(id)) { LOGINFO(5, "Received an unknown block " << id << " in BLOCK_NOTIFY"); + if (m_blockPendingRequests.size() >= 25) { + LOGINFO(5, "Too many pending requests, ignoring it"); + return; + } + + if (!server->m_blockNotifyRequests.insert(*id.u64()).second || !server->m_missingBlockRequests.insert({ m_peerId, *id.u64() }).second) { + LOGINFO(5, "BLOCK_REQUEST for id = " << id << " was already sent"); + return; + } + const bool result = server->send(this, [&id, this](uint8_t* buf, size_t buf_size) -> size_t { @@ -2340,7 +2355,7 @@ void P2PServer::P2PClient::on_block_notify(const uint8_t* buf) }); if (result) { - m_blockPendingRequests.push_back(id); + m_blockPendingRequests.push_back(*id.u64()); } } } @@ -2449,8 +2464,7 @@ void P2PServer::P2PClient::handle_incoming_block(p2pool* pool, PoolBlock& block, LOGWARN(3, "peer " << static_cast(m_addrString) << " banned for " << DEFAULT_BAN_TIME << " seconds"); } else { - const log::hex_buf addr_hex(addr.data, sizeof(addr.data)); - LOGWARN(3, "IP " << addr_hex << " banned for " << DEFAULT_BAN_TIME << " seconds"); + LOGWARN(3, addr << " banned for " << DEFAULT_BAN_TIME << " seconds"); } P2PServer* server = pool->p2p_server(); @@ -2515,7 +2529,7 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count return; } - m_blockPendingRequests.push_back(id); + m_blockPendingRequests.push_back(*id.u64()); } } diff --git a/src/p2p_server.h b/src/p2p_server.h index 90fa327..2c9a842 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -19,7 +19,7 @@ #include "tcp_server.h" #include "pool_block.h" -#include +#include namespace p2pool { @@ -108,7 +108,7 @@ public: void on_after_handshake(uint8_t* &p); bool on_listen_port(const uint8_t* buf); bool on_block_request(const uint8_t* buf); - bool on_block_response(const uint8_t* buf, uint32_t size, const hash& expected_id); + bool on_block_response(const uint8_t* buf, uint32_t size, uint64_t expected_id); bool on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact); bool on_peer_list_request(const uint8_t* buf); void on_peer_list_response(const uint8_t* buf); @@ -146,7 +146,7 @@ public: int64_t m_pingTime; - std::list m_blockPendingRequests; + std::deque m_blockPendingRequests; uint64_t m_lastAlive; uint64_t m_lastBroadcastTimestamp; @@ -254,6 +254,7 @@ private: bool m_lookForMissingBlocks; unordered_set> m_missingBlockRequests; + unordered_set m_blockNotifyRequests; P2PClient* m_fastestPeer; diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 9158b9b..2500c08 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -455,7 +455,7 @@ bool SideChain::get_shares(const PoolBlock* tip, std::vector& shares hash h; keccak(tip->m_txkeySecSeed.h, HASH_SIZE, h.h); - uint64_t seed = *reinterpret_cast(h.h); + uint64_t seed = *h.u64(); if (seed == 0) seed = 1; for (uint64_t i = 0, k; i < n - 1; ++i) { diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index db655b5..6fc558e 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -293,7 +293,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log client->m_rpcId = static_cast(client->m_owner)->get_random32(); } while (!client->m_rpcId); - log::hex_buf target_hex(reinterpret_cast(&target), sizeof(uint64_t)); + log::hex_buf target_hex(&target); if (target >= TARGET_4_BYTES_LIMIT) { target_hex.m_data += sizeof(uint32_t); @@ -775,7 +775,7 @@ void StratumServer::on_blobs_ready() const bool result = send(client, [data, target, hashing_blob, job_id](uint8_t* buf, size_t buf_size) { - log::hex_buf target_hex(reinterpret_cast(&target), sizeof(uint64_t)); + log::hex_buf target_hex(&target); if (target >= TARGET_4_BYTES_LIMIT) { target_hex.m_data += sizeof(uint32_t); @@ -930,7 +930,7 @@ void StratumServer::on_share_found(uv_work_t* req) } // Send the response to miner - const uint64_t value = *reinterpret_cast(share->m_resultHash.h + HASH_SIZE - sizeof(uint64_t)); + const uint64_t value = share->m_resultHash.u64()[HASH_SIZE / sizeof(uint64_t) - 1]; if (LIKELY(value < target)) { const uint64_t timestamp = share->m_timestamp; diff --git a/src/util.cpp b/src/util.cpp index 6eab49e..dda71ad 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -141,7 +141,7 @@ NOINLINE difficulty_type& difficulty_type::operator/=(difficulty_type b) NOINLINE bool difficulty_type::check_pow(const hash& pow_hash) const { - const uint64_t* a = reinterpret_cast(pow_hash.h); + const uint64_t* a = pow_hash.u64(); uint64_t result[6] = {}; uint64_t product[6] = {};