diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 3bf8693..58a4f7c 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -52,6 +52,7 @@ P2PServer::P2PServer(p2pool* pool) uv_mutex_init_checked(&m_blockLock); uv_mutex_init_checked(&m_peerListLock); uv_mutex_init_checked(&m_broadcastLock); + uv_mutex_init_checked(&m_missingBlockRequestsLock); uv_rwlock_init_checked(&m_cachedBlocksLock); int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast); @@ -97,6 +98,7 @@ P2PServer::~P2PServer() uv_mutex_destroy(&m_blockLock); uv_mutex_destroy(&m_peerListLock); uv_mutex_destroy(&m_broadcastLock); + uv_mutex_destroy(&m_missingBlockRequestsLock); uv_rwlock_destroy(&m_cachedBlocksLock); delete m_block; @@ -617,9 +619,26 @@ void P2PServer::download_missing_blocks() clients.emplace_back(client); } + if (clients.empty()) { + return; + } + // Try to download each block from a random client for (const hash& id : missing_blocks) { - send(clients[get_random64() % clients.size()], + P2PClient* client = clients[get_random64() % clients.size()]; + + { + MutexLock lock2(m_missingBlockRequestsLock); + + const uint64_t truncated_block_id = *reinterpret_cast(id.h); + if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) { + // We already asked this peer about this block + // Don't try to ask another peer, leave it for another timer tick + continue; + } + } + + send(client, [this, &id](void* buf) { uint8_t* p0 = reinterpret_cast(buf); diff --git a/src/p2p_server.h b/src/p2p_server.h index ce3e628..f3c5e35 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -174,6 +174,9 @@ private: uv_async_t m_broadcastAsync; std::vector m_broadcastQueue; + uv_mutex_t m_missingBlockRequestsLock; + std::set> m_missingBlockRequests; + static void on_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->on_broadcast(); } void on_broadcast(); };