P2PServer: sync from the fastest peer if possible

This commit is contained in:
SChernykh 2022-09-13 10:51:15 +02:00
parent f813cf6d36
commit afa9cf371e
3 changed files with 63 additions and 40 deletions

View file

@ -58,6 +58,7 @@ P2PServer::P2PServer(p2pool* pool)
, m_timerInterval(2)
, m_peerListLastSaved(0)
, m_lookForMissingBlocks(true)
, m_fastestPeer(nullptr)
{
m_blockDeserializeBuf.reserve(131072);
@ -88,7 +89,6 @@ P2PServer::P2PServer(p2pool* pool)
uv_mutex_init_checked(&m_blockLock);
uv_mutex_init_checked(&m_peerListLock);
uv_mutex_init_checked(&m_broadcastLock);
uv_mutex_init_checked(&m_missingBlockRequestsLock);
uv_rwlock_init_checked(&m_cachedBlocksLock);
uv_mutex_init_checked(&m_connectToPeersLock);
@ -151,7 +151,6 @@ P2PServer::~P2PServer()
uv_mutex_destroy(&m_blockLock);
uv_mutex_destroy(&m_peerListLock);
uv_mutex_destroy(&m_broadcastLock);
uv_mutex_destroy(&m_missingBlockRequestsLock);
clear_cached_blocks();
uv_rwlock_destroy(&m_cachedBlocksLock);
@ -273,6 +272,7 @@ void P2PServer::update_peer_connections()
const uint64_t last_updated = m_pool->side_chain().last_updated();
bool has_good_peers = false;
m_fastestPeer = nullptr;
unordered_set<raw_ip> connected_clients;
{
@ -306,6 +306,9 @@ void P2PServer::update_peer_connections()
connected_clients.insert(client->m_addr);
if (client->is_good()) {
has_good_peers = true;
if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) {
m_fastestPeer = client;
}
}
}
}
@ -373,6 +376,13 @@ void P2PServer::update_peer_list()
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (client->is_good() && (cur_time >= client->m_nextOutgoingPeerListRequest)) {
send_peer_list_request(client, cur_time);
}
}
}
void P2PServer::send_peer_list_request(P2PClient* client, uint64_t cur_time)
{
// Send peer list requests at random intervals (60-120 seconds)
client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61));
@ -393,8 +403,6 @@ void P2PServer::update_peer_list()
client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now();
++client->m_peerListPendingRequests;
}
}
}
}
void P2PServer::save_peer_list_async()
@ -1006,8 +1014,6 @@ void P2PServer::download_missing_blocks()
if (missing_blocks.empty()) {
m_lookForMissingBlocks = false;
MutexLock lock(m_missingBlockRequestsLock);
m_missingBlockRequests.clear();
return;
}
@ -1037,16 +1043,12 @@ void P2PServer::download_missing_blocks()
for (const hash& id : missing_blocks) {
P2PClient* client = clients[get_random64() % clients.size()];
{
MutexLock lock3(m_missingBlockRequestsLock);
const uint64_t truncated_block_id = *reinterpret_cast<const uint64_t*>(id.h);
if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) {
// We already asked this peer about this block
// Don't try to ask another peer, leave it for another timer tick
continue;
}
}
if (m_cachedBlocks) {
auto it = m_cachedBlocks->find(id);
@ -1112,7 +1114,7 @@ P2PServer::P2PClient::P2PClient()
, m_nextOutgoingPeerListRequest(0)
, m_lastPeerListRequestTime{}
, m_peerListPendingRequests(0)
, m_pingTime(0)
, m_pingTime(-1)
, m_blockPendingRequests(0)
, m_chainTipBlockRequest(false)
, m_lastAlive(0)
@ -1128,6 +1130,12 @@ P2PServer::P2PClient::~P2PClient()
void P2PServer::P2PClient::reset()
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
if (server && (server->m_fastestPeer == this)) {
server->m_fastestPeer = nullptr;
}
Client::reset();
m_peerId = 0;
@ -1142,7 +1150,7 @@ void P2PServer::P2PClient::reset()
m_nextOutgoingPeerListRequest = 0;
m_lastPeerListRequestTime = {};
m_peerListPendingRequests = 0;
m_pingTime = 0;
m_pingTime = -1;
m_blockPendingRequests = 0;
m_chainTipBlockRequest = false;
m_lastAlive = 0;
@ -1379,7 +1387,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size)
bytes_read = 2u + num_peers * 19u;
using namespace std::chrono;
m_pingTime = duration_cast<milliseconds>(high_resolution_clock::now() - m_lastPeerListRequestTime).count();
m_pingTime = std::max<int64_t>(duration_cast<milliseconds>(high_resolution_clock::now() - m_lastPeerListRequestTime).count(), 0);
--m_peerListPendingRequests;
if (!on_peer_list_response(buf + 1)) {
@ -1417,11 +1425,16 @@ void P2PServer::P2PClient::on_read_failed(int /*err*/)
void P2PServer::P2PClient::on_disconnected()
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
if (server && (server->m_fastestPeer == this)) {
server->m_fastestPeer = nullptr;
}
if (!m_handshakeComplete) {
LOGWARN(5, "peer " << static_cast<char*>(m_addrString) << " disconnected before finishing handshake");
ban(DEFAULT_BAN_TIME);
P2PServer* server = static_cast<P2PServer*>(m_owner);
if (server) {
server->remove_peer_from_list(this);
}
@ -1835,6 +1848,8 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size)
LOGWARN(4, "peer " << static_cast<char*>(m_addrString) << " is mining on top of a stale block (mainchain height " << peer_height << ", expected >= " << our_height << ')');
return false;
}
server->send_peer_list_request(this, seconds_since_epoch());
}
return handle_incoming_block_async(server->get_block());
@ -2104,6 +2119,12 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count
P2PServer* server = static_cast<P2PServer*>(m_owner);
// If the initial sync is not finished yet, try to ask the fastest peer too
P2PClient* c = server->m_fastestPeer;
if (c && (c != this) && !server->m_pool->side_chain().precalcFinished()) {
c->post_handle_incoming_block(c->m_resetCounter.load(), missing_blocks);
}
ReadLock lock(server->m_cachedBlocksLock);
for (const hash& id : missing_blocks) {
@ -2116,7 +2137,7 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count
}
}
const bool result = m_owner->send(this,
const bool result = server->send(this,
[&id](void* buf, size_t buf_size) -> size_t
{
LOGINFO(5, "sending BLOCK_REQUEST for id = " << id);

View file

@ -167,6 +167,7 @@ private:
void check_zmq();
void update_peer_connections();
void update_peer_list();
void send_peer_list_request(P2PClient* client, uint64_t cur_time);
void save_peer_list_async();
void save_peer_list();
void load_peer_list();
@ -216,10 +217,10 @@ private:
std::vector<Broadcast*> m_broadcastQueue;
bool m_lookForMissingBlocks;
uv_mutex_t m_missingBlockRequestsLock;
unordered_set<std::pair<uint64_t, uint64_t>> m_missingBlockRequests;
P2PClient* m_fastestPeer;
static void on_broadcast(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->on_broadcast(); }
void on_broadcast();

View file

@ -76,6 +76,7 @@ public:
bool is_mini() const;
const PoolBlock* chainTip() const { return m_chainTip; }
bool precalcFinished() const { return m_precalcFinished.load(); }
static bool split_reward(uint64_t reward, const std::vector<MinerShare>& shares, std::vector<uint64_t>& rewards);