Peer list refactoring

- Remove peers that weren't seen for > 1 hour
- Better peer selection algorithm for PEER_LIST_RESPONSE
This commit is contained in:
SChernykh 2021-10-15 11:32:01 +02:00
parent 9e90e988fa
commit 9b86f8e81f
2 changed files with 54 additions and 24 deletions

View file

@ -167,12 +167,6 @@ void P2PServer::on_connect_failed(bool is_v6, const raw_ip& ip, int port)
void P2PServer::update_peer_connections()
{
std::vector<Peer> peer_list;
{
MutexLock lock(m_peerListLock);
peer_list = m_peerList;
}
const time_t cur_time = time(nullptr);
const time_t last_updated = m_pool->side_chain().last_updated();
@ -181,13 +175,14 @@ void P2PServer::update_peer_connections()
MutexLock lock(m_clientsListLock);
connected_clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
connected_clients.emplace_back(client->m_addr);
bool disconnected = false;
const int timeout = client->m_handshakeComplete ? 300 : 10;
if (cur_time >= client->m_lastAlive + timeout) {
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
client->close();
disconnected = true;
}
if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) {
@ -201,11 +196,35 @@ void P2PServer::update_peer_connections()
client->ban(DEFAULT_BAN_TIME);
remove_peer_from_list(client);
client->close();
disconnected = true;
}
}
if (!disconnected) {
connected_clients.emplace_back(client->m_addr);
}
}
}
std::vector<Peer> peer_list;
{
MutexLock lock(m_peerListLock);
if ((m_timerCounter % 30) == 1) {
// Update last seen time for currently connected peers
for (Peer& p : m_peerList) {
if (std::find_if(connected_clients.begin(), connected_clients.end(), [&p](const raw_ip& addr) { return p.m_addr == addr; }) != connected_clients.end()) {
p.m_lastSeen = cur_time;
}
}
// Remove all peers that weren't seen for more than 1 hour
m_peerList.erase(std::remove_if(m_peerList.begin(), m_peerList.end(), [cur_time](const Peer& p) { return p.m_lastSeen + 3600 < cur_time; }), m_peerList.end());
}
peer_list = m_peerList;
}
// Try to have at least 8 outgoing connections
for (uint32_t i = m_numConnections - m_numIncomingConnections; (i < 8) && !peer_list.empty();) {
const uint64_t k = get_random64() % peer_list.size();
@ -429,8 +448,6 @@ void P2PServer::load_peer_list()
}
p.m_isV6 = true;
memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr));
p.m_port = port;
p.m_numFailedConnections = 0;
}
else {
sockaddr_in addr4;
@ -444,8 +461,6 @@ void P2PServer::load_peer_list()
p.m_addr.data[10] = 0xFF;
p.m_addr.data[11] = 0xFF;
memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr));
p.m_port = port;
p.m_numFailedConnections = 0;
}
bool already_added = false;
@ -456,6 +471,10 @@ void P2PServer::load_peer_list()
}
}
p.m_port = port;
p.m_numFailedConnections = 0;
p.m_lastSeen = time(nullptr);
if (!already_added && !is_banned(p.m_addr)) {
m_peerList.push_back(p);
}
@ -466,18 +485,21 @@ void P2PServer::load_peer_list()
void P2PServer::update_peer_in_list(bool is_v6, const raw_ip& ip, int port)
{
const time_t cur_time = time(nullptr);
MutexLock lock(m_peerListLock);
for (Peer& p : m_peerList) {
if ((p.m_isV6 == is_v6) && (p.m_addr == ip)) {
p.m_port = port;
p.m_numFailedConnections = 0;
p.m_lastSeen = cur_time;
return;
}
}
if (!is_banned(ip)) {
m_peerList.emplace_back(Peer{ is_v6, ip, port, 0 });
m_peerList.emplace_back(Peer{ is_v6, ip, port, 0, cur_time });
}
}
@ -1492,23 +1514,28 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
MutexLock lock(server->m_clientsListLock);
// Send every 4th peer on average, selected at random
const uint32_t n = server->m_numConnections;
const uint32_t peers_to_send_target = std::min<uint32_t>(PEER_LIST_RESPONSE_MAX_PEERS, std::max<uint32_t>(1, n / 4));
const uint32_t peers_to_send_target = std::min<uint32_t>(PEER_LIST_RESPONSE_MAX_PEERS, std::max<uint32_t>(1, server->m_numConnections / 4));
uint32_t n = 0;
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (client->m_listenPort < 0) {
if ((client->m_listenPort < 0) || (client->m_addr == m_addr)) {
continue;
}
uint64_t hi;
umul128(server->get_random64(), n, &hi);
const Peer p{ client->m_isV6, client->m_addr, client->m_listenPort, 0, 0 };
++n;
if ((hi < peers_to_send_target) && (client->m_addr != m_addr)) {
peers[num_selected_peers++] = { client->m_isV6, client->m_addr, client->m_listenPort, 0 };
// Use https://en.wikipedia.org/wiki/Reservoir_sampling algorithm
if (num_selected_peers < peers_to_send_target) {
peers[num_selected_peers++] = p;
continue;
}
if (num_selected_peers >= PEER_LIST_RESPONSE_MAX_PEERS) {
break;
}
uint64_t k;
umul128(server->get_random64(), n, &k);
if (k < peers_to_send_target) {
peers[k] = p;
}
}
}
@ -1542,6 +1569,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
const time_t cur_time = time(nullptr);
MutexLock lock(server->m_peerListLock);
@ -1558,15 +1586,16 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const
buf += 2;
bool already_added = false;
for (const Peer& p : server->m_peerList) {
for (Peer& p : server->m_peerList) {
if ((p.m_isV6 == is_v6) && (p.m_addr == ip)) {
already_added = true;
p.m_lastSeen = cur_time;
break;
}
}
if (!already_added && !server->is_banned(ip)) {
server->m_peerList.emplace_back(Peer{ is_v6, ip, port, 0 });
server->m_peerList.emplace_back(Peer{ is_v6, ip, port, 0, cur_time });
}
}

View file

@ -169,6 +169,7 @@ private:
raw_ip m_addr;
int m_port;
uint32_t m_numFailedConnections;
time_t m_lastSeen;
};
std::vector<Peer> m_peerList;