From c135787620bf41fd593f2797a05e37c835615bc4 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Wed, 9 Nov 2022 11:17:35 +0100 Subject: [PATCH 1/3] Prepare compact blob for block broadcasts Reference transactions by index in the parent block instead of storing full 32 bytes per transaction --- src/p2p_server.cpp | 39 +++++++++++++++++++++++++++++++++++++-- src/p2p_server.h | 3 ++- src/side_chain.cpp | 4 ++-- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index ad62308..8220cf4 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -726,7 +726,7 @@ void P2PServer::remove_peer_from_list(const raw_ip& ip) } } -void P2PServer::broadcast(const PoolBlock& block) +void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent) { MinerData miner_data = m_pool->miner_data(); @@ -766,13 +766,48 @@ void P2PServer::broadcast(const PoolBlock& block) writeVarint(outputs_blob_size, data->pruned_blob); data->pruned_blob.insert(data->pruned_blob.end(), mainchain_data.begin() + outputs_offset + outputs_blob_size, mainchain_data.end()); + + const size_t N = block.m_transactions.size(); + if ((N > 1) && parent) { + unordered_map parent_transactions; + parent_transactions.reserve(parent->m_transactions.size()); + + for (size_t i = 1; i < parent->m_transactions.size(); ++i) { + parent_transactions.emplace(parent->m_transactions[i], i); + } + + // Reserve 1 additional byte per transaction to be ready for the worst case (all transactions are different in the parent block) + data->compact_blob.reserve(data->pruned_blob.capacity() + (N - 1)); + + // Copy pruned_blob without the transaction list + data->compact_blob.assign(data->pruned_blob.begin(), data->pruned_blob.end() - (N - 1) * HASH_SIZE); + + // Process transaction hashes one by one + size_t num_found = 0; + for (size_t i = 1; i < N; ++i) { + const hash& tx = block.m_transactions[i]; + auto it = parent_transactions.find(tx); + if (it != parent_transactions.end()) { + writeVarint(it->second, data->compact_blob); + ++num_found; + } + else { + data->compact_blob.push_back(0); + data->compact_blob.insert(data->compact_blob.end(), tx.h, tx.h + HASH_SIZE); + } + } + LOGINFO(6, "compact blob: " << num_found << '/' << (N - 1) << " transactions were found in the parent block"); + + data->compact_blob.insert(data->compact_blob.end(), sidechain_data.begin(), sidechain_data.end()); + } + data->pruned_blob.insert(data->pruned_blob.end(), sidechain_data.begin(), sidechain_data.end()); data->ancestor_hashes.reserve(block.m_uncles.size() + 1); data->ancestor_hashes = block.m_uncles; data->ancestor_hashes.push_back(block.m_parent); - LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (pruned/full)"); + LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->compact_blob.size() << '/' << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (compact/pruned/full)"); { MutexLock lock(m_broadcastLock); diff --git a/src/p2p_server.h b/src/p2p_server.h index da278c0..930a951 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -130,7 +130,7 @@ public: std::atomic m_broadcastedHashesIndex{ 0 }; }; - void broadcast(const PoolBlock& block); + void broadcast(const PoolBlock& block, const PoolBlock* parent); uint64_t get_random64(); uint64_t get_peerId() const { return m_peerId; } @@ -212,6 +212,7 @@ private: { std::vector blob; std::vector pruned_blob; + std::vector compact_blob; std::vector ancestor_hashes; }; diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 038b39b..c8898a0 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -1275,7 +1275,7 @@ void SideChain::verify_loop(PoolBlock* block) if (block->m_wantBroadcast && !block->m_broadcasted) { block->m_broadcasted = true; if (server && (block->m_depth < UNCLE_BLOCK_DEPTH)) { - server->broadcast(*block); + server->broadcast(*block, get_parent(block)); } } @@ -1674,7 +1674,7 @@ void SideChain::update_chain_tip(const PoolBlock* block) if (p2pServer() && block->m_wantBroadcast && !block->m_broadcasted) { block->m_broadcasted = true; - p2pServer()->broadcast(*block); + p2pServer()->broadcast(*block, get_parent(block)); } } From 47ff7e228ee8b81b752fc2881e9e9ae32d6c2f43 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Wed, 9 Nov 2022 15:29:53 +0100 Subject: [PATCH 2/3] Added code to deserialize compact blobs --- src/block_cache.cpp | 2 +- src/block_template.cpp | 4 +- src/p2p_server.cpp | 10 ++-- src/p2p_server.h | 2 +- src/pool_block.h | 2 +- src/pool_block_parser.inl | 99 +++++++++++++++++++++++++++------- tests/src/pool_block_tests.cpp | 4 +- 7 files changed, 91 insertions(+), 32 deletions(-) diff --git a/src/block_cache.cpp b/src/block_cache.cpp index e500856..0a3346e 100644 --- a/src/block_cache.cpp +++ b/src/block_cache.cpp @@ -201,7 +201,7 @@ void BlockCache::load_all(SideChain& side_chain, P2PServer& server) continue; } - if (block.deserialize(data + sizeof(uint32_t), n, side_chain, uv_default_loop_checked()) == 0) { + if (block.deserialize(data + sizeof(uint32_t), n, side_chain, uv_default_loop_checked(), false) == 0) { server.add_cached_block(block); ++blocks_loaded; } diff --git a/src/block_template.cpp b/src/block_template.cpp index 38f6ea8..22df40d 100644 --- a/src/block_template.cpp +++ b/src/block_template.cpp @@ -568,7 +568,7 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet buf.insert(buf.end(), sidechain_data.begin(), sidechain_data.end()); PoolBlock check; - const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain(), nullptr); + const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain(), nullptr, false); if (result != 0) { LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result); } @@ -1104,7 +1104,7 @@ void BlockTemplate::submit_sidechain_block(uint32_t template_id, uint32_t nonce, buf.insert(buf.end(), sidechain_data.begin(), sidechain_data.end()); PoolBlock check; - const int result = check.deserialize(buf.data(), buf.size(), side_chain, nullptr); + const int result = check.deserialize(buf.data(), buf.size(), side_chain, nullptr, false); if (result != 0) { LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result); } diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 8220cf4..a21fbd4 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -768,7 +768,7 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent) data->pruned_blob.insert(data->pruned_blob.end(), mainchain_data.begin() + outputs_offset + outputs_blob_size, mainchain_data.end()); const size_t N = block.m_transactions.size(); - if ((N > 1) && parent) { + if ((N > 1) && parent && (parent->m_transactions.size() > 1)) { unordered_map parent_transactions; parent_transactions.reserve(parent->m_transactions.size()); @@ -975,7 +975,7 @@ int P2PServer::listen_port() const return params.m_p2pExternalPort ? params.m_p2pExternalPort : m_listenPort; } -int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size) +int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size, bool compact) { int result; @@ -984,7 +984,7 @@ int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size) result = m_blockDeserializeResult; } else { - result = m_block->deserialize(buf, size, m_pool->side_chain(), &m_loop); + result = m_block->deserialize(buf, size, m_pool->side_chain(), &m_loop, compact); m_blockDeserializeBuf.assign(buf, buf + size); m_blockDeserializeResult = result; m_lookForMissingBlocks = true; @@ -1898,7 +1898,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) MutexLock lock(server->m_blockLock); - const int result = server->deserialize_block(buf, size); + const int result = server->deserialize_block(buf, size, false); if (result != 0) { LOGWARN(3, "peer " << static_cast(m_addrString) << " sent an invalid block, error " << result); return false; @@ -1938,7 +1938,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) MutexLock lock(server->m_blockLock); - const int result = server->deserialize_block(buf, size); + const int result = server->deserialize_block(buf, size, false); if (result != 0) { LOGWARN(3, "peer " << static_cast(m_addrString) << " sent an invalid block, error " << result); return false; diff --git a/src/p2p_server.h b/src/p2p_server.h index 930a951..5bf48de 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -146,7 +146,7 @@ public: void set_max_outgoing_peers(uint32_t n) { m_maxOutgoingPeers = std::min(std::max(n, 10U), 450U); } void set_max_incoming_peers(uint32_t n) { m_maxIncomingPeers = std::min(std::max(n, 10U), 450U); } - int deserialize_block(const uint8_t* buf, uint32_t size); + int deserialize_block(const uint8_t* buf, uint32_t size, bool compact); const PoolBlock* get_block() const { return m_block; } private: diff --git a/src/pool_block.h b/src/pool_block.h index f17c24e..d6043cf 100644 --- a/src/pool_block.h +++ b/src/pool_block.h @@ -135,7 +135,7 @@ struct PoolBlock std::vector serialize_mainchain_data_nolock(size_t* header_size, size_t* miner_tx_size, int* outputs_offset, int* outputs_blob_size) const; std::vector serialize_sidechain_data() const; - int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop); + int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop, bool compact); void reset_offchain_data(); bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash); diff --git a/src/pool_block_parser.inl b/src/pool_block_parser.inl index 3d21626..7ff33a1 100644 --- a/src/pool_block_parser.inl +++ b/src/pool_block_parser.inl @@ -23,7 +23,7 @@ namespace p2pool { // Since data here can come from external and possibly malicious sources, check everything // Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here // Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on) -int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop) +int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop, bool compact) { try { // Sanity check @@ -193,23 +193,60 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si uint64_t num_transactions; READ_VARINT(num_transactions); - if (num_transactions > std::numeric_limits::max() / HASH_SIZE) return __LINE__; - if (static_cast(data_end - data) < num_transactions * HASH_SIZE) return __LINE__; + const int transactions_offset = static_cast(data - data_begin); - m_transactions.resize(1); - m_transactions.reserve(num_transactions + 1); + std::vector parent_indices; + if (compact) { + if (static_cast(data_end - data) < num_transactions) return __LINE__; - for (uint64_t i = 0; i < num_transactions; ++i) { - hash id; - READ_BUF(id.h, HASH_SIZE); - m_transactions.emplace_back(std::move(id)); + m_transactions.resize(1); + parent_indices.resize(1); + + // limit reserved memory size because we can't check "num_transactions" properly here + const uint64_t k = std::min(num_transactions + 1, 256); + m_transactions.reserve(k); + parent_indices.reserve(k); + + for (uint64_t i = 0; i < num_transactions; ++i) { + uint64_t parent_index; + READ_VARINT(parent_index); + + hash id; + if (parent_index == 0) { + READ_BUF(id.h, HASH_SIZE); + } + + m_transactions.emplace_back(id); + parent_indices.emplace_back(parent_index); + } + } + else { + if (num_transactions > std::numeric_limits::max() / HASH_SIZE) return __LINE__; + if (static_cast(data_end - data) < num_transactions * HASH_SIZE) return __LINE__; + + m_transactions.resize(1); + m_transactions.reserve(num_transactions + 1); + + for (uint64_t i = 0; i < num_transactions; ++i) { + hash id; + READ_BUF(id.h, HASH_SIZE); + m_transactions.emplace_back(id); + } } + const int transactions_actual_blob_size = static_cast(data - data_begin) - transactions_offset; + const int transactions_blob_size = static_cast(num_transactions) * HASH_SIZE; + const int transactions_blob_size_diff = transactions_blob_size - transactions_actual_blob_size; + + m_transactions.shrink_to_fit(); + #if POOL_BLOCK_DEBUG - m_mainChainDataDebug.reserve((data - data_begin) + outputs_blob_size_diff); + m_mainChainDataDebug.reserve((data - data_begin) + outputs_blob_size_diff + transactions_blob_size_diff); m_mainChainDataDebug.assign(data_begin, data_begin + outputs_offset); m_mainChainDataDebug.insert(m_mainChainDataDebug.end(), outputs_blob_size, 0); - m_mainChainDataDebug.insert(m_mainChainDataDebug.end(), data_begin + outputs_offset + outputs_actual_blob_size, data); + m_mainChainDataDebug.insert(m_mainChainDataDebug.end(), data_begin + outputs_offset + outputs_actual_blob_size, data_begin + transactions_offset); + m_mainChainDataDebug.insert(m_mainChainDataDebug.end(), transactions_blob_size, 0); + m_mainChainDataDebug.insert(m_mainChainDataDebug.end(), data_begin + transactions_offset + transactions_actual_blob_size, data); const uint8_t* sidechain_data_begin = data; #endif @@ -239,6 +276,23 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si READ_BUF(m_parent.h, HASH_SIZE); + if (compact) { + const PoolBlock* parent = sidechain.find_block(m_parent); + if (!parent) { + return __LINE__; + } + + for (uint64_t i = 1, n = m_transactions.size(); i < n; ++i) { + const uint64_t parent_index = parent_indices[i]; + if (parent_index) { + if (parent_index >= parent->m_transactions.size()) { + return __LINE__; + } + m_transactions[i] = parent->m_transactions[parent_index]; + } + } + } + uint64_t num_uncles; READ_VARINT(num_uncles); @@ -251,7 +305,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si for (uint64_t i = 0; i < num_uncles; ++i) { hash id; READ_BUF(id.h, HASH_SIZE); - m_uncles.emplace_back(std::move(id)); + m_uncles.emplace_back(id); } READ_VARINT(m_sidechainHeight); @@ -279,14 +333,18 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si return __LINE__; } + const uint8_t* transactions_blob = reinterpret_cast(m_transactions.data() + 1); + #if POOL_BLOCK_DEBUG memcpy(m_mainChainDataDebug.data() + outputs_offset, outputs_blob.data(), outputs_blob_size); + memcpy(m_mainChainDataDebug.data() + transactions_offset + outputs_blob_size_diff, transactions_blob, transactions_blob_size); #endif hash check; const std::vector& consensus_id = sidechain.consensus_id(); + const int data_size = static_cast((data_end - data_begin) + outputs_blob_size_diff + transactions_blob_size_diff); keccak_custom( - [nonce_offset, extra_nonce_offset, sidechain_hash_offset, data_begin, data_end, &consensus_id, &outputs_blob, outputs_blob_size_diff, outputs_offset, outputs_blob_size](int offset) -> uint8_t + [nonce_offset, extra_nonce_offset, sidechain_hash_offset, data_begin, data_size, &consensus_id, &outputs_blob, outputs_blob_size_diff, outputs_offset, outputs_blob_size, transactions_blob, transactions_blob_size_diff, transactions_offset, transactions_blob_size](int offset) -> uint8_t { uint32_t k = static_cast(offset - nonce_offset); if (k < NONCE_SIZE) { @@ -303,24 +361,25 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si return 0; } - const int data_size = static_cast((data_end - data_begin) + outputs_blob_size_diff); if (offset < data_size) { if (offset < outputs_offset) { return data_begin[offset]; } else if (offset < outputs_offset + outputs_blob_size) { - const int tmp = offset - outputs_offset; - return outputs_blob[tmp]; + return outputs_blob[offset - outputs_offset]; } - else { + else if (offset < transactions_offset + outputs_blob_size_diff) { return data_begin[offset - outputs_blob_size_diff]; } + else if (offset < transactions_offset + outputs_blob_size_diff + transactions_blob_size) { + return transactions_blob[offset - (transactions_offset + outputs_blob_size_diff)]; + } + return data_begin[offset - outputs_blob_size_diff - transactions_blob_size_diff]; } - offset -= data_size; - return consensus_id[offset]; + return consensus_id[offset - data_size]; }, - static_cast(size + outputs_blob_size_diff + consensus_id.size()), check.h, HASH_SIZE); + static_cast(size + outputs_blob_size_diff + transactions_blob_size_diff + consensus_id.size()), check.h, HASH_SIZE); if (check != m_sidechainId) { return __LINE__; diff --git a/tests/src/pool_block_tests.cpp b/tests/src/pool_block_tests.cpp index a34d88b..cbe4378 100644 --- a/tests/src/pool_block_tests.cpp +++ b/tests/src/pool_block_tests.cpp @@ -49,7 +49,7 @@ TEST(pool_block, deserialize) f.read(reinterpret_cast(buf.data()), buf.size()); ASSERT_EQ(f.good(), true); - ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain, nullptr), 0); + ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain, nullptr, false), 0); size_t header_size, miner_tx_size; int outputs_offset, outputs_blob_size; @@ -137,7 +137,7 @@ TEST(pool_block, verify) p += sizeof(uint32_t); ASSERT_TRUE(p + n <= e); - ASSERT_EQ(b.deserialize(p, n, sidechain, nullptr), 0); + ASSERT_EQ(b.deserialize(p, n, sidechain, nullptr, false), 0); p += n; sidechain.add_block(b); From 25806b67a2162f62c9fe749bedb7a3e85d4bbb06 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Wed, 9 Nov 2022 19:03:58 +0100 Subject: [PATCH 3/3] Added protocol version negotiation Protocol version 1.1: compact block broadcasts --- src/p2p_server.cpp | 76 ++++++++++++++++++++++++++++++++-------------- src/p2p_server.h | 13 ++++++-- 2 files changed, 65 insertions(+), 24 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index a21fbd4..27e231d 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -875,6 +875,7 @@ void P2PServer::on_broadcast() uint8_t* p = p0; bool send_pruned = true; + bool send_compact = (client->m_protocolVersion >= PROTOCOL_VERSION_1_1) && !data->compact_blob.empty() && (data->compact_blob.size() < data->pruned_blob.size()); const hash* a = client->m_broadcastedHashes; const hash* b = client->m_broadcastedHashes + array_size(&P2PClient::m_broadcastedHashes); @@ -882,25 +883,27 @@ void P2PServer::on_broadcast() for (const hash& id : data->ancestor_hashes) { if (std::find(a, b, id) == b) { send_pruned = false; + send_compact = false; break; } } if (send_pruned) { - LOGINFO(6, "sending BLOCK_BROADCAST (pruned) to " << log::Gray() << static_cast(client->m_addrString)); + LOGINFO(6, "sending BLOCK_BROADCAST (" << (send_compact ? "compact" : "pruned") << ") to " << log::Gray() << static_cast(client->m_addrString)); + const std::vector& blob = send_compact ? data->compact_blob : data->pruned_blob; - const uint32_t len = static_cast(data->pruned_blob.size()); + const uint32_t len = static_cast(blob.size()); if (buf_size < SEND_BUF_MIN_SIZE + 1 + sizeof(uint32_t) + len) { return 0; } - *(p++) = static_cast(MessageId::BLOCK_BROADCAST); + *(p++) = static_cast(send_compact ? MessageId::BLOCK_BROADCAST_COMPACT : MessageId::BLOCK_BROADCAST); memcpy(p, &len, sizeof(uint32_t)); p += sizeof(uint32_t); if (len) { - memcpy(p, data->pruned_blob.data(), len); + memcpy(p, blob.data(), len); p += len; } } @@ -1169,6 +1172,7 @@ P2PServer::P2PClient::P2PClient() , m_nextOutgoingPeerListRequest(0) , m_lastPeerListRequestTime{} , m_peerListPendingRequests(0) + , m_protocolVersion(PROTOCOL_VERSION_1_0) , m_pingTime(-1) , m_blockPendingRequests(0) , m_chainTipBlockRequest(false) @@ -1214,6 +1218,7 @@ void P2PServer::P2PClient::reset() m_nextOutgoingPeerListRequest = 0; m_lastPeerListRequestTime = {}; m_peerListPendingRequests = 0; + m_protocolVersion = PROTOCOL_VERSION_1_0; m_pingTime = -1; m_blockPendingRequests = 0; m_chainTipBlockRequest = false; @@ -1400,16 +1405,20 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size) break; case MessageId::BLOCK_BROADCAST: - LOGINFO(6, "peer " << log::Gray() << static_cast(m_addrString) << log::NoColor() << " sent BLOCK_BROADCAST"); + case MessageId::BLOCK_BROADCAST_COMPACT: + { + const bool compact = (id == MessageId::BLOCK_BROADCAST_COMPACT); + LOGINFO(6, "peer " << log::Gray() << static_cast(m_addrString) << log::NoColor() << " sent " << (compact ? "BLOCK_BROADCAST_COMPACT" : "BLOCK_BROADCAST")); - if (bytes_left >= 1 + sizeof(uint32_t)) { - const uint32_t block_size = read_unaligned(reinterpret_cast(buf + 1)); - if (bytes_left >= 1 + sizeof(uint32_t) + block_size) { - bytes_read = 1 + sizeof(uint32_t) + block_size; - if (!on_block_broadcast(buf + 1 + sizeof(uint32_t), block_size)) { - ban(DEFAULT_BAN_TIME); - server->remove_peer_from_list(this); - return false; + if (bytes_left >= 1 + sizeof(uint32_t)) { + const uint32_t block_size = read_unaligned(reinterpret_cast(buf + 1)); + if (bytes_left >= 1 + sizeof(uint32_t) + block_size) { + bytes_read = 1 + sizeof(uint32_t) + block_size; + if (!on_block_broadcast(buf + 1 + sizeof(uint32_t), block_size, compact)) { + ban(DEFAULT_BAN_TIME); + server->remove_peer_from_list(this); + return false; + } } } } @@ -1927,7 +1936,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) return handle_incoming_block_async(server->get_block(), max_time_delta); } -bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) +bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact) { if (!size) { LOGWARN(3, "peer " << static_cast(m_addrString) << " broadcasted an empty block"); @@ -1938,7 +1947,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) MutexLock lock(server->m_blockLock); - const int result = server->deserialize_block(buf, size, false); + const int result = server->deserialize_block(buf, size, compact); if (result != 0) { LOGWARN(3, "peer " << static_cast(m_addrString) << " sent an invalid block, error " << result); return false; @@ -1993,6 +2002,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) { P2PServer* server = static_cast(m_owner); const uint64_t cur_time = seconds_since_epoch(); + const bool first = (m_prevIncomingPeerListRequest == 0); // Allow peer list requests no more than once every 30 seconds if (cur_time - m_prevIncomingPeerListRequest < 30) { @@ -2037,6 +2047,24 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) } } + // Protocol version message: + // - IPv4 address = 255.255.255.255 + // - port = 65535 + // - first 12 bytes of the 16-byte raw IP address are ignored by older clients if it's IPv4 + // - use first 4 bytes of the 16-byte raw IP address to send supported protocol version + if (first) { + LOGINFO(5, "sending protocol version " << (SUPPORTED_PROTOCOL_VERSION >> 16) << '.' << (SUPPORTED_PROTOCOL_VERSION & 0xFFFF) << " to peer " << log::Gray() << static_cast(m_addrString)); + + peers[0] = {}; + *reinterpret_cast(peers[0].m_addr.data) = SUPPORTED_PROTOCOL_VERSION; + *reinterpret_cast(peers[0].m_addr.data + 12) = 0xFFFFFFFFU; + peers[0].m_port = 0xFFFF; + + if (num_selected_peers == 0) { + num_selected_peers = 1; + } + } + return server->send(this, [this, &peers, num_selected_peers](void* buf, size_t buf_size) -> size_t { @@ -2068,7 +2096,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) }); } -bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const +bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) { P2PServer* server = static_cast(m_owner); const uint64_t cur_time = seconds_since_epoch(); @@ -2083,6 +2111,10 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const memcpy(ip.data, buf, sizeof(ip.data)); buf += sizeof(ip.data); + int port = 0; + memcpy(&port, buf, 2); + buf += 2; + // Treat IPv4-mapped addresses as regular IPv4 addresses if (is_v6 && ip.is_ipv4_prefix()) { is_v6 = false; @@ -2092,8 +2124,12 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const const uint32_t b = ip.data[12]; if ((b == 0) || (b >= 224)) { // Ignore 0.0.0.0/8 (special-purpose range for "this network") and 224.0.0.0/3 (IP multicast and reserved ranges) - // Some values in these ranges will be used to enable future P2Pool binary protocol versions - buf += 2; + + // Check for protocol version message + if ((*reinterpret_cast(ip.data + 12) == 0xFFFFFFFFU) && (port == 0xFFFF)) { + m_protocolVersion = *reinterpret_cast(ip.data); + LOGINFO(5, "peer " << log::Gray() << static_cast(m_addrString) << log::NoColor() << " supports protocol version " << (m_protocolVersion >> 16) << '.' << (m_protocolVersion & 0xFFFF)); + } continue; } @@ -2103,10 +2139,6 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const ip.data[11] = 0xFF; } - int port = 0; - memcpy(&port, buf, 2); - buf += 2; - bool already_added = false; for (Peer& p : server->m_peerList) { if ((p.m_isV6 == is_v6) && (p.m_addr == ip)) { diff --git a/src/p2p_server.h b/src/p2p_server.h index 5bf48de..69917c9 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -30,6 +30,11 @@ static constexpr size_t PEER_LIST_RESPONSE_MAX_PEERS = 16; static constexpr int DEFAULT_P2P_PORT = 37889; static constexpr int DEFAULT_P2P_PORT_MINI = 37888; +static constexpr uint32_t PROTOCOL_VERSION_1_0 = 0x00010000UL; +static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL; + +static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_1; + class P2PServer : public TCPServer { public: @@ -42,6 +47,7 @@ public: BLOCK_BROADCAST = 5, PEER_LIST_REQUEST = 6, PEER_LIST_RESPONSE = 7, + BLOCK_BROADCAST_COMPACT = 8, }; explicit P2PServer(p2pool *pool); @@ -94,9 +100,9 @@ public: bool on_listen_port(const uint8_t* buf); bool on_block_request(const uint8_t* buf); bool on_block_response(const uint8_t* buf, uint32_t size); - bool on_block_broadcast(const uint8_t* buf, uint32_t size); + bool on_block_broadcast(const uint8_t* buf, uint32_t size, bool compact); bool on_peer_list_request(const uint8_t* buf); - bool on_peer_list_response(const uint8_t* buf) const; + bool on_peer_list_response(const uint8_t* buf); bool handle_incoming_block_async(const PoolBlock* block, uint64_t max_time_delta = 0); void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, const raw_ip& addr, std::vector& missing_blocks); @@ -117,6 +123,9 @@ public: uint64_t m_nextOutgoingPeerListRequest; std::chrono::high_resolution_clock::time_point m_lastPeerListRequestTime; int m_peerListPendingRequests; + + uint32_t m_protocolVersion; + int64_t m_pingTime; int m_blockPendingRequests;