P2PServer: refactored BLOCK_NOTIFY logic

- Limit how many block requests can be in flight
- Don't send requests for the same block twice
This commit is contained in:
SChernykh 2023-07-07 11:00:19 +02:00
parent 3e269b49d4
commit d8ecc1174d
7 changed files with 50 additions and 29 deletions

View file

@ -152,8 +152,8 @@ struct alignas(uint64_t) hash
FORCEINLINE bool operator<(const hash& other) const
{
const uint64_t* a = reinterpret_cast<const uint64_t*>(h);
const uint64_t* b = reinterpret_cast<const uint64_t*>(other.h);
const uint64_t* a = u64();
const uint64_t* b = other.u64();
if (a[3] < b[3]) return true;
if (a[3] > b[3]) return false;
@ -169,18 +169,21 @@ struct alignas(uint64_t) hash
FORCEINLINE bool operator==(const hash& other) const
{
const uint64_t* a = reinterpret_cast<const uint64_t*>(h);
const uint64_t* b = reinterpret_cast<const uint64_t*>(other.h);
const uint64_t* a = u64();
const uint64_t* b = other.u64();
return (a[0] == b[0]) && (a[1] == b[1]) && (a[2] == b[2]) && (a[3] == b[3]);
}
FORCEINLINE bool operator!=(const hash& other) const { return !operator==(other); }
FORCEINLINE bool empty() const {
const uint64_t* a = reinterpret_cast<const uint64_t*>(h);
const uint64_t* a = u64();
return (a[0] == 0) && (a[1] == 0) && (a[2] == 0) && (a[3] == 0);
}
FORCEINLINE uint64_t* u64() { return reinterpret_cast<uint64_t*>(h); }
FORCEINLINE const uint64_t* u64() const { return reinterpret_cast<const uint64_t*>(h); }
friend std::ostream& operator<<(std::ostream& s, const hash& d);
friend std::istream& operator>>(std::istream& s, hash& d);
};

View file

@ -289,6 +289,9 @@ struct hex_buf
{
FORCEINLINE hex_buf(const uint8_t* data, size_t size) : m_data(data), m_size(size) {}
template<typename T>
explicit FORCEINLINE hex_buf(const T* data) : m_data(reinterpret_cast<const uint8_t*>(data)), m_size(sizeof(T)) {}
const uint8_t* m_data;
size_t m_size;
};

View file

@ -1120,6 +1120,7 @@ void P2PServer::download_missing_blocks()
if (missing_blocks.empty()) {
m_lookForMissingBlocks = false;
m_missingBlockRequests.clear();
m_blockNotifyRequests.clear();
return;
}
@ -1146,8 +1147,12 @@ void P2PServer::download_missing_blocks()
for (const hash& id : missing_blocks) {
P2PClient* client = clients[get_random64() % clients.size()];
const uint64_t truncated_block_id = *reinterpret_cast<const uint64_t*>(id.h);
if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) {
if (client->m_blockPendingRequests.size() >= 25) {
// Too many pending requests to this peer
continue;
}
if (!m_missingBlockRequests.insert({ client->m_peerId, *id.u64() }).second) {
// We already asked this peer about this block
// Don't try to ask another peer, leave it for another timer tick
continue;
@ -1182,7 +1187,7 @@ void P2PServer::download_missing_blocks()
});
if (result) {
client->m_blockPendingRequests.push_back(id);
client->m_blockPendingRequests.push_back(*id.u64());
}
}
}
@ -1540,7 +1545,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size)
if (bytes_left >= 1 + sizeof(uint32_t) + block_size) {
bytes_read = 1 + sizeof(uint32_t) + block_size;
const hash expected_id = m_blockPendingRequests.front();
const uint64_t expected_id = m_blockPendingRequests.front();
m_blockPendingRequests.pop_front();
if (!on_block_response(buf + 1 + sizeof(uint32_t), block_size, expected_id)) {
@ -1770,7 +1775,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH
return;
}
const uint64_t* value = reinterpret_cast<uint64_t*>(work->solution.h);
const uint64_t* value = work->solution.u64();
uint64_t high;
umul128(value[HASH_SIZE / sizeof(uint64_t) - 1], CHALLENGE_DIFFICULTY, &high);
@ -1916,7 +1921,7 @@ bool P2PServer::P2PClient::on_handshake_solution(const uint8_t* buf)
// Check that incoming connection provided enough PoW
if (m_isIncoming) {
const uint64_t* value = reinterpret_cast<uint64_t*>(solution.h);
const uint64_t* value = solution.u64();
uint64_t high;
umul128(value[HASH_SIZE / sizeof(uint64_t) - 1], CHALLENGE_DIFFICULTY, &high);
@ -1977,7 +1982,7 @@ void P2PServer::P2PClient::on_after_handshake(uint8_t* &p)
memcpy(p, zero_hash.h, HASH_SIZE);
p += HASH_SIZE;
m_blockPendingRequests.push_back(zero_hash);
m_blockPendingRequests.push_back(0);
m_lastBroadcastTimestamp = seconds_since_epoch();
}
@ -2038,14 +2043,14 @@ bool P2PServer::P2PClient::on_block_request(const uint8_t* buf)
});
}
bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, const hash& expected_id)
bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size, uint64_t expected_id)
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
const uint64_t cur_time = seconds_since_epoch();
if (!size) {
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor() << " sent an empty block response");
if (expected_id.empty() && (cur_time >= m_nextOutgoingPeerListRequest)) {
if ((expected_id == 0) && (cur_time >= m_nextOutgoingPeerListRequest)) {
server->send_peer_list_request(this, cur_time);
}
return true;
@ -2064,7 +2069,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size,
const PoolBlock* block = server->get_block();
// Chain tip request
if (expected_id.empty()) {
if (expected_id == 0) {
const uint64_t peer_height = block->m_txinGenHeight;
const uint64_t our_height = server->m_pool->miner_data().height;
@ -2077,8 +2082,8 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size,
server->send_peer_list_request(this, cur_time);
}
}
else if (block->m_sidechainId != expected_id) {
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " sent a wrong block: expected " << expected_id << ", got " << block->m_sidechainId);
else if (*block->m_sidechainId.u64() != expected_id) {
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " sent a wrong block: expected " << log::hex_buf(&expected_id) << ", got " << block->m_sidechainId);
return false;
}
@ -2320,6 +2325,16 @@ void P2PServer::P2PClient::on_block_notify(const uint8_t* buf)
if (!server->find_block(id)) {
LOGINFO(5, "Received an unknown block " << id << " in BLOCK_NOTIFY");
if (m_blockPendingRequests.size() >= 25) {
LOGINFO(5, "Too many pending requests, ignoring it");
return;
}
if (!server->m_blockNotifyRequests.insert(*id.u64()).second || !server->m_missingBlockRequests.insert({ m_peerId, *id.u64() }).second) {
LOGINFO(5, "BLOCK_REQUEST for id = " << id << " was already sent");
return;
}
const bool result = server->send(this,
[&id, this](uint8_t* buf, size_t buf_size) -> size_t
{
@ -2340,7 +2355,7 @@ void P2PServer::P2PClient::on_block_notify(const uint8_t* buf)
});
if (result) {
m_blockPendingRequests.push_back(id);
m_blockPendingRequests.push_back(*id.u64());
}
}
}
@ -2449,8 +2464,7 @@ void P2PServer::P2PClient::handle_incoming_block(p2pool* pool, PoolBlock& block,
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " banned for " << DEFAULT_BAN_TIME << " seconds");
}
else {
const log::hex_buf addr_hex(addr.data, sizeof(addr.data));
LOGWARN(3, "IP " << addr_hex << " banned for " << DEFAULT_BAN_TIME << " seconds");
LOGWARN(3, addr << " banned for " << DEFAULT_BAN_TIME << " seconds");
}
P2PServer* server = pool->p2p_server();
@ -2515,7 +2529,7 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count
return;
}
m_blockPendingRequests.push_back(id);
m_blockPendingRequests.push_back(*id.u64());
}
}

View file

@ -19,7 +19,7 @@
#include "tcp_server.h"
#include "pool_block.h"
#include <list>
#include <deque>
namespace p2pool {
@ -108,7 +108,7 @@ public:
void on_after_handshake(uint8_t* &p);
bool on_listen_port(const uint8_t* buf);
bool on_block_request(const uint8_t* buf);
bool on_block_response(const uint8_t* buf, uint32_t size, const hash& expected_id);
bool on_block_response(const uint8_t* buf, uint32_t size, uint64_t expected_id);
bool on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact);
bool on_peer_list_request(const uint8_t* buf);
void on_peer_list_response(const uint8_t* buf);
@ -146,7 +146,7 @@ public:
int64_t m_pingTime;
std::list<hash> m_blockPendingRequests;
std::deque<uint64_t> m_blockPendingRequests;
uint64_t m_lastAlive;
uint64_t m_lastBroadcastTimestamp;
@ -254,6 +254,7 @@ private:
bool m_lookForMissingBlocks;
unordered_set<std::pair<uint64_t, uint64_t>> m_missingBlockRequests;
unordered_set<uint64_t> m_blockNotifyRequests;
P2PClient* m_fastestPeer;

View file

@ -455,7 +455,7 @@ bool SideChain::get_shares(const PoolBlock* tip, std::vector<MinerShare>& shares
hash h;
keccak(tip->m_txkeySecSeed.h, HASH_SIZE, h.h);
uint64_t seed = *reinterpret_cast<uint64_t*>(h.h);
uint64_t seed = *h.u64();
if (seed == 0) seed = 1;
for (uint64_t i = 0, k; i < n - 1; ++i) {

View file

@ -293,7 +293,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log
client->m_rpcId = static_cast<StratumServer*>(client->m_owner)->get_random32();
} while (!client->m_rpcId);
log::hex_buf target_hex(reinterpret_cast<const uint8_t*>(&target), sizeof(uint64_t));
log::hex_buf target_hex(&target);
if (target >= TARGET_4_BYTES_LIMIT) {
target_hex.m_data += sizeof(uint32_t);
@ -775,7 +775,7 @@ void StratumServer::on_blobs_ready()
const bool result = send(client,
[data, target, hashing_blob, job_id](uint8_t* buf, size_t buf_size)
{
log::hex_buf target_hex(reinterpret_cast<const uint8_t*>(&target), sizeof(uint64_t));
log::hex_buf target_hex(&target);
if (target >= TARGET_4_BYTES_LIMIT) {
target_hex.m_data += sizeof(uint32_t);
@ -930,7 +930,7 @@ void StratumServer::on_share_found(uv_work_t* req)
}
// Send the response to miner
const uint64_t value = *reinterpret_cast<uint64_t*>(share->m_resultHash.h + HASH_SIZE - sizeof(uint64_t));
const uint64_t value = share->m_resultHash.u64()[HASH_SIZE / sizeof(uint64_t) - 1];
if (LIKELY(value < target)) {
const uint64_t timestamp = share->m_timestamp;

View file

@ -141,7 +141,7 @@ NOINLINE difficulty_type& difficulty_type::operator/=(difficulty_type b)
NOINLINE bool difficulty_type::check_pow(const hash& pow_hash) const
{
const uint64_t* a = reinterpret_cast<const uint64_t*>(pow_hash.h);
const uint64_t* a = pow_hash.u64();
uint64_t result[6] = {};
uint64_t product[6] = {};