P2PServer: avoid unnecessary block broadcasts

This commit is contained in:
SChernykh 2023-07-04 16:06:01 +02:00
parent 5444701c38
commit d6cb0ee8a0
2 changed files with 45 additions and 8 deletions

View file

@ -883,12 +883,28 @@ void P2PServer::on_broadcast()
{ {
uint8_t* p = buf; uint8_t* p = buf;
bool send_pruned = true;
bool send_compact = (client->m_protocolVersion >= PROTOCOL_VERSION_1_1) && !data->compact_blob.empty() && (data->compact_blob.size() < data->pruned_blob.size());
const hash* a = client->m_broadcastedHashes; const hash* a = client->m_broadcastedHashes;
const hash* b = client->m_broadcastedHashes + array_size(&P2PClient::m_broadcastedHashes); const hash* b = client->m_broadcastedHashes + array_size(&P2PClient::m_broadcastedHashes);
// If this peer already broadcasted this block to us, we don't need to broadcast it back, we just need to notify the peer
if ((client->m_protocolVersion >= PROTOCOL_VERSION_1_2) && (std::find(a, b, data->id) != b)) {
LOGINFO(5, "sending BLOCK_NOTIFY to " << log::Gray() << static_cast<char*>(client->m_addrString));
if (buf_size < 1 + HASH_SIZE) {
return 0;
}
*(p++) = static_cast<uint8_t>(MessageId::BLOCK_NOTIFY);
memcpy(p, data->id.h, HASH_SIZE);
p += HASH_SIZE;
return p - buf;
}
bool send_pruned = true;
bool send_compact = (client->m_protocolVersion >= PROTOCOL_VERSION_1_1) && !data->compact_blob.empty() && (data->compact_blob.size() < data->pruned_blob.size());
for (const hash& id : data->ancestor_hashes) { for (const hash& id : data->ancestor_hashes) {
if (std::find(a, b, id) == b) { if (std::find(a, b, id) == b) {
send_pruned = false; send_pruned = false;
@ -898,7 +914,7 @@ void P2PServer::on_broadcast()
} }
if (send_pruned) { if (send_pruned) {
LOGINFO(6, "sending BLOCK_BROADCAST (" << (send_compact ? "compact" : "pruned") << ") to " << log::Gray() << static_cast<char*>(client->m_addrString)); LOGINFO(6, "sending BLOCK_BROADCAST " << (send_compact ? "(compact)" : "(pruned) ") << ") to " << log::Gray() << static_cast<char*>(client->m_addrString));
const std::vector<uint8_t>& blob = send_compact ? data->compact_blob : data->pruned_blob; const std::vector<uint8_t>& blob = send_compact ? data->compact_blob : data->pruned_blob;
const uint32_t len = static_cast<uint32_t>(blob.size()); const uint32_t len = static_cast<uint32_t>(blob.size());
@ -917,7 +933,7 @@ void P2PServer::on_broadcast()
} }
} }
else { else {
LOGINFO(5, "sending BLOCK_BROADCAST (full) to " << log::Gray() << static_cast<char*>(client->m_addrString)); LOGINFO(5, "sending BLOCK_BROADCAST (full) to " << log::Gray() << static_cast<char*>(client->m_addrString));
const uint32_t len = static_cast<uint32_t>(data->blob.size()); const uint32_t len = static_cast<uint32_t>(data->blob.size());
if (buf_size < 1 + sizeof(uint32_t) + len) { if (buf_size < 1 + sizeof(uint32_t) + len) {
@ -1240,6 +1256,7 @@ P2PServer::P2PClient::P2PClient()
, m_lastBroadcastTimestamp(0) , m_lastBroadcastTimestamp(0)
, m_lastBlockrequestTimestamp(0) , m_lastBlockrequestTimestamp(0)
, m_broadcastedHashes{} , m_broadcastedHashes{}
, m_broadcastedHashesIndex(0)
{ {
m_p2pReadBuf[0] = '\0'; m_p2pReadBuf[0] = '\0';
} }
@ -1586,6 +1603,15 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size)
} }
} }
break; break;
case MessageId::BLOCK_NOTIFY:
LOGINFO(6, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor() << " sent BLOCK_NOTIFY");
if (bytes_left >= 1 + HASH_SIZE) {
bytes_read = 1 + HASH_SIZE;
on_block_notify(buf + 1);
}
break;
} }
if (bytes_read) { if (bytes_read) {
@ -2072,7 +2098,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size,
const PoolBlock* block = server->get_block(); const PoolBlock* block = server->get_block();
m_broadcastMaxHeight = std::max(m_broadcastMaxHeight, block->m_sidechainHeight); m_broadcastMaxHeight = std::max(m_broadcastMaxHeight, block->m_sidechainHeight);
m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = block->m_sidechainId; m_broadcastedHashes[m_broadcastedHashesIndex++ % array_size(&P2PClient::m_broadcastedHashes)] = block->m_sidechainId;
MinerData miner_data = server->m_pool->miner_data(); MinerData miner_data = server->m_pool->miner_data();
@ -2269,6 +2295,14 @@ void P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf)
} }
} }
void P2PServer::P2PClient::on_block_notify(const uint8_t* buf)
{
hash id;
memcpy(id.h, buf, HASH_SIZE);
m_broadcastedHashes[m_broadcastedHashesIndex++ % array_size(&P2PClient::m_broadcastedHashes)] = id;
}
bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta) bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta)
{ {
P2PServer* server = static_cast<P2PServer*>(m_owner); P2PServer* server = static_cast<P2PServer*>(m_owner);

View file

@ -37,8 +37,9 @@ static constexpr int DEFAULT_P2P_PORT_MINI = 37888;
static constexpr uint32_t PROTOCOL_VERSION_1_0 = 0x00010000UL; static constexpr uint32_t PROTOCOL_VERSION_1_0 = 0x00010000UL;
static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL; static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL;
static constexpr uint32_t PROTOCOL_VERSION_1_2 = 0x00010002UL;
static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_1; static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_2;
class P2PServer : public TCPServer class P2PServer : public TCPServer
{ {
@ -53,6 +54,7 @@ public:
PEER_LIST_REQUEST = 6, PEER_LIST_REQUEST = 6,
PEER_LIST_RESPONSE = 7, PEER_LIST_RESPONSE = 7,
BLOCK_BROADCAST_COMPACT = 8, BLOCK_BROADCAST_COMPACT = 8,
BLOCK_NOTIFY = 9,
}; };
explicit P2PServer(p2pool *pool); explicit P2PServer(p2pool *pool);
@ -109,6 +111,7 @@ public:
bool on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact); bool on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact);
bool on_peer_list_request(const uint8_t* buf); bool on_peer_list_request(const uint8_t* buf);
void on_peer_list_response(const uint8_t* buf); void on_peer_list_response(const uint8_t* buf);
void on_block_notify(const uint8_t* buf);
bool handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta = 0); bool handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta = 0);
void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, bool is_v6, const raw_ip& addr, std::vector<hash>& missing_blocks); void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, bool is_v6, const raw_ip& addr, std::vector<hash>& missing_blocks);
@ -149,7 +152,7 @@ public:
uint64_t m_lastBlockrequestTimestamp; uint64_t m_lastBlockrequestTimestamp;
hash m_broadcastedHashes[8]; hash m_broadcastedHashes[8];
std::atomic<uint32_t> m_broadcastedHashesIndex{ 0 }; uint32_t m_broadcastedHashesIndex;
}; };
void broadcast(const PoolBlock& block, const PoolBlock* parent); void broadcast(const PoolBlock& block, const PoolBlock* parent);