synchronize peer manager's reported peers to fix concurrency error

This commit is contained in:
woodser 2024-01-01 08:32:54 -05:00
parent 5d88936600
commit 9a6a9ac93e

View file

@ -365,14 +365,16 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
// We check if the reported msg is not violating our rules // We check if the reported msg is not violating our rules
if (peers.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) { if (peers.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(peers); synchronized (reportedPeers) {
purgeReportedPeersIfExceeds(); reportedPeers.addAll(peers);
purgeReportedPeersIfExceeds();
getPersistedPeers().addAll(peers); getPersistedPeers().addAll(peers);
purgePersistedPeersIfExceeds(); purgePersistedPeersIfExceeds();
requestPersistence(); requestPersistence();
printReportedPeers(); printReportedPeers();
}
} else { } else {
// If a node is trying to send too many list we treat it as rule violation. // If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom. // Reported list include the connected list. We use the max value and give some extra headroom.
@ -589,8 +591,11 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void removeReportedPeer(Peer reportedPeer) { private void removeReportedPeer(Peer reportedPeer) {
reportedPeers.remove(reportedPeer); synchronized (reportedPeers) {
printReportedPeers(); reportedPeers.remove(reportedPeer);
printReportedPeers();
}
} }
private void removeReportedPeer(NodeAddress nodeAddress) { private void removeReportedPeer(NodeAddress nodeAddress) {
@ -611,35 +616,39 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
private void purgeReportedPeersIfExceeds() { private void purgeReportedPeersIfExceeds() {
int size = reportedPeers.size(); synchronized (reportedPeers) {
if (size > MAX_REPORTED_PEERS) { int size = reportedPeers.size();
log.info("We have already {} reported peers which exceeds our limit of {}." + if (size > MAX_REPORTED_PEERS) {
"We remove random peers from the reported peers list.", size, MAX_REPORTED_PEERS); log.info("We have already {} reported peers which exceeds our limit of {}." +
int diff = size - MAX_REPORTED_PEERS; "We remove random peers from the reported peers list.", size, MAX_REPORTED_PEERS);
List<Peer> list = new ArrayList<>(reportedPeers); int diff = size - MAX_REPORTED_PEERS;
// we don't use sorting by lastActivityDate to keep it more random List<Peer> list = new ArrayList<>(reportedPeers);
for (int i = 0; i < diff; i++) { // we don't use sorting by lastActivityDate to keep it more random
if (!list.isEmpty()) { for (int i = 0; i < diff; i++) {
Peer toRemove = list.remove(new Random().nextInt(list.size())); if (!list.isEmpty()) {
removeReportedPeer(toRemove); Peer toRemove = list.remove(new Random().nextInt(list.size()));
removeReportedPeer(toRemove);
}
} }
} else {
log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
} }
} else {
log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
} }
} }
private void printReportedPeers() { private void printReportedPeers() {
if (!reportedPeers.isEmpty()) { synchronized (reportedPeers) {
if (PRINT_REPORTED_PEERS_DETAILS) { if (!reportedPeers.isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + if (PRINT_REPORTED_PEERS_DETAILS) {
"Collected reported peers:"); StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers); "Collected reported peers:");
reportedPeersClone.forEach(e -> result.append("\n").append(e)); List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
result.append("\n------------------------------------------------------------\n"); reportedPeersClone.forEach(e -> result.append("\n").append(e));
log.trace(result.toString()); result.append("\n------------------------------------------------------------\n");
log.trace(result.toString());
}
log.debug("Number of reported peers: {}", reportedPeers.size());
} }
log.debug("Number of reported peers: {}", reportedPeers.size());
} }
} }