throttle log warnings in Connection.java to 30s

This commit is contained in:
woodser 2024-07-28 09:20:35 -04:00 committed by GitHub
parent 180fde87cc
commit 75b96e83da
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 59 additions and 54 deletions

View file

@ -171,9 +171,11 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final Capabilities capabilities = new Capabilities(); private final Capabilities capabilities = new Capabilities();
// throttle logs of reported invalid requests // throttle logs of reported invalid requests
private static long lastLoggedInvalidRequestReport = 0; private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // throttle logging rule violations and warnings to once every 30 seconds
private static int unloggedInvalidRequestReports = 0; private static long lastLoggedInvalidRequestReportTs = 0;
private static final long LOG_INVALID_REQUEST_REPORTS_INTERVAL_MS = 60000; // log invalid request reports once every 60s private static int numUnloggedInvalidRequestReports = 0;
private static long lastLoggedWarningTs = 0;
private static int numUnloggedWarnings = 0;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
@ -218,8 +220,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (peersNodeAddress != null) { if (peersNodeAddress != null) {
setPeersNodeAddress(peersNodeAddress); setPeersNodeAddress(peersNodeAddress);
if (banFilter != null && banFilter.isPeerBanned(peersNodeAddress)) { if (banFilter != null && banFilter.isPeerBanned(peersNodeAddress)) {
log.warn("We created an outbound connection with a banned peer"); reportInvalidRequest(RuleViolation.PEER_BANNED, "We created an outbound connection with a banned peer");
reportInvalidRequest(RuleViolation.PEER_BANNED);
} }
} }
ThreadUtils.execute(() -> connectionListener.onConnection(this), THREAD_ID); ThreadUtils.execute(() -> connectionListener.onConnection(this), THREAD_ID);
@ -249,8 +250,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (banFilter != null && if (banFilter != null &&
peersNodeAddressOptional.isPresent() && peersNodeAddressOptional.isPresent() &&
banFilter.isPeerBanned(peersNodeAddressOptional.get())) { banFilter.isPeerBanned(peersNodeAddressOptional.get())) {
log.warn("We tried to send a message to a banned peer. message={}", networkEnvelope.getClass().getSimpleName()); String errorMessage = "We tried to send a message to a banned peer. message=" + networkEnvelope.getClass().getSimpleName();
reportInvalidRequest(RuleViolation.PEER_BANNED); reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage);
return; return;
} }
@ -419,7 +420,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof SendersNodeAddressMessage) { if (networkEnvelope instanceof SendersNodeAddressMessage) {
boolean isValid = processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope); boolean isValid = processSendersNodeAddressMessage((SendersNodeAddressMessage) networkEnvelope);
if (!isValid) { if (!isValid) {
log.warn("Received an invalid {} at processing BundleOfEnvelopes", networkEnvelope.getClass().getSimpleName()); throttleWarn("Received an invalid " + networkEnvelope.getClass().getSimpleName() + " at processing BundleOfEnvelopes");
continue; continue;
} }
} }
@ -610,20 +611,20 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
* Runs in same thread as Connection * Runs in same thread as Connection
*/ */
public boolean reportInvalidRequest(RuleViolation ruleViolation) { public boolean reportInvalidRequest(RuleViolation ruleViolation, String errorMessage) {
return Connection.reportInvalidRequest(this, ruleViolation); return Connection.reportInvalidRequest(this, ruleViolation, errorMessage);
} }
private static synchronized boolean reportInvalidRequest(Connection connection, RuleViolation ruleViolation) { private static synchronized boolean reportInvalidRequest(Connection connection, RuleViolation ruleViolation, String errorMessage) {
// determine if report should be logged to avoid spamming the logs // determine if report should be logged to avoid spamming the logs
boolean logReport = System.currentTimeMillis() - lastLoggedInvalidRequestReport > LOG_INVALID_REQUEST_REPORTS_INTERVAL_MS; boolean logReport = System.currentTimeMillis() - lastLoggedInvalidRequestReportTs > LOG_THROTTLE_INTERVAL_MS;
// count the number of unlogged reports since last log entry // count the number of unlogged reports since last log entry
if (!logReport) unloggedInvalidRequestReports++; if (!logReport) numUnloggedInvalidRequestReports++;
// handle report // handle report
if (logReport) log.info("We got reported the ruleViolation {} at connection with address {} and uid {}", ruleViolation, connection.getPeersNodeAddressProperty(), connection.getUid()); if (logReport) log.warn("We got reported the ruleViolation {} at connection with address={}, uid={}, errorMessage={}", ruleViolation, connection.getPeersNodeAddressProperty(), connection.getUid(), errorMessage);
int numRuleViolations; int numRuleViolations;
numRuleViolations = connection.ruleViolations.getOrDefault(ruleViolation, 0); numRuleViolations = connection.ruleViolations.getOrDefault(ruleViolation, 0);
numRuleViolations++; numRuleViolations++;
@ -654,9 +655,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private static synchronized void resetReportedInvalidRequestsThrottle(boolean logReport) { private static synchronized void resetReportedInvalidRequestsThrottle(boolean logReport) {
if (logReport) { if (logReport) {
if (unloggedInvalidRequestReports > 0) log.warn("We received {} other reports of invalid requests since the last log entry", unloggedInvalidRequestReports); if (numUnloggedInvalidRequestReports > 0) log.warn("We received {} other reports of invalid requests since the last log entry", numUnloggedInvalidRequestReports);
unloggedInvalidRequestReports = 0; numUnloggedInvalidRequestReports = 0;
lastLoggedInvalidRequestReport = System.currentTimeMillis(); lastLoggedInvalidRequestReportTs = System.currentTimeMillis();
} }
} }
@ -673,25 +674,22 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
else else
closeConnectionReason = CloseConnectionReason.RESET; closeConnectionReason = CloseConnectionReason.RESET;
log.info("SocketException (expected if connection lost). closeConnectionReason={}; connection={}", closeConnectionReason, this); throttleWarn("SocketException (expected if connection lost). closeConnectionReason=" + closeConnectionReason + "; connection=" + this);
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) { } else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT; closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.info("Shut down caused by exception {} on connection={}", e, this); throttleWarn("Shut down caused by exception " + e.getMessage() + " on connection=" + this);
} else if (e instanceof EOFException) { } else if (e instanceof EOFException) {
closeConnectionReason = CloseConnectionReason.TERMINATED; closeConnectionReason = CloseConnectionReason.TERMINATED;
log.warn("Shut down caused by exception {} on connection={}", e, this); throttleWarn("Shut down caused by exception " + e.getMessage() + " on connection=" + this);
} else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) { } else if (e instanceof OptionalDataException || e instanceof StreamCorruptedException) {
closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA; closeConnectionReason = CloseConnectionReason.CORRUPTED_DATA;
log.warn("Shut down caused by exception {} on connection={}", e, this); throttleWarn("Shut down caused by exception " + e.getMessage() + " on connection=" + this);
} else { } else {
// TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException // TODO sometimes we get StreamCorruptedException, OptionalDataException, IllegalStateException
closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION; closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket: {}\n\t" + throttleWarn("Unknown reason for exception at socket: " + socket.toString() + "\n\t" +
"peer={}\n\t" + "peer=" + this.peersNodeAddressOptional + "\n\t" +
"Exception={}", "Exception=" + e.toString());
socket.toString(),
this.peersNodeAddressOptional,
e.toString());
} }
shutDown(closeConnectionReason); shutDown(closeConnectionReason);
} }
@ -712,8 +710,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} }
if (banFilter != null && banFilter.isPeerBanned(senderNodeAddress)) { if (banFilter != null && banFilter.isPeerBanned(senderNodeAddress)) {
log.warn("We got a message from a banned peer. message={}", sendersNodeAddressMessage.getClass().getSimpleName()); String errorMessage = "We got a message from a banned peer. message=" + sendersNodeAddressMessage.getClass().getSimpleName();
reportInvalidRequest(RuleViolation.PEER_BANNED); reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage);
return false; return false;
} }
@ -745,7 +743,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
try { try {
if (socket != null && if (socket != null &&
socket.isClosed()) { socket.isClosed()) {
log.warn("Socket is null or closed socket={}", socket); throttleWarn("Socket is null or closed socket=" + socket);
shutDown(CloseConnectionReason.SOCKET_CLOSED); shutDown(CloseConnectionReason.SOCKET_CLOSED);
return; return;
} }
@ -757,7 +755,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (socket != null && if (socket != null &&
socket.isClosed()) { socket.isClosed()) {
log.warn("Socket is null or closed socket={}", socket); throttleWarn("Socket is null or closed socket=" + socket);
shutDown(CloseConnectionReason.SOCKET_CLOSED); shutDown(CloseConnectionReason.SOCKET_CLOSED);
return; return;
} }
@ -767,9 +765,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return; return;
} }
if (protoInputStream.read() == -1) { if (protoInputStream.read() == -1) {
log.warn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown."); throttleWarn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
} else { } else {
log.warn("proto is null. protoInputStream.read()=" + protoInputStream.read()); throttleWarn("proto is null. protoInputStream.read()=" + protoInputStream.read());
} }
shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV); shutDown(CloseConnectionReason.NO_PROTO_BUFFER_ENV);
return; return;
@ -778,8 +776,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (banFilter != null && if (banFilter != null &&
peersNodeAddressOptional.isPresent() && peersNodeAddressOptional.isPresent() &&
banFilter.isPeerBanned(peersNodeAddressOptional.get())) { banFilter.isPeerBanned(peersNodeAddressOptional.get())) {
log.warn("We got a message from a banned peer. proto={}", Utilities.toTruncatedString(proto)); String errorMessage = "We got a message from a banned peer. proto=" + Utilities.toTruncatedString(proto);
reportInvalidRequest(RuleViolation.PEER_BANNED); reportInvalidRequest(RuleViolation.PEER_BANNED, errorMessage);
return; return;
} }
@ -814,30 +812,28 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage && if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage &&
!((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) { !((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().verifyHashSize()) {
log.warn("PersistableNetworkPayload.verifyHashSize failed. hashSize={}; object={}", String errorMessage = "PersistableNetworkPayload.verifyHashSize failed. hashSize=" +
((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length, ((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload().getHash().length + "; object=" +
Utilities.toTruncatedString(proto)); Utilities.toTruncatedString(proto);
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED)) if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED, errorMessage))
return; return;
} }
if (exceeds) { if (exceeds) {
log.warn("size > MAX_MSG_SIZE. size={}; object={}", size, Utilities.toTruncatedString(proto)); String errorMessage = "size > MAX_MSG_SIZE. size=" + size + "; object=" + Utilities.toTruncatedString(proto);
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED, errorMessage))
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return; return;
} }
if (violatesThrottleLimit() && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED)) if (violatesThrottleLimit() && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED, "Violates throttle limit"))
return; return;
// Check P2P network ID // Check P2P network ID
String errorMessage = "RuleViolation.WRONG_NETWORK_ID. version of message=" + proto.getMessageVersion() +
", app version=" + Version.getP2PMessageVersion() +
", proto.toTruncatedString=" + Utilities.toTruncatedString(proto.toString());
if (!proto.getMessageVersion().equals(Version.getP2PMessageVersion()) if (!proto.getMessageVersion().equals(Version.getP2PMessageVersion())
&& reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) { && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID, errorMessage)) {
log.warn("RuleViolation.WRONG_NETWORK_ID. version of message={}, app version={}, " +
"proto.toTruncatedString={}", proto.getMessageVersion(),
Version.getP2PMessageVersion(),
Utilities.toTruncatedString(proto.toString()));
return; return;
} }
@ -879,12 +875,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
ThreadUtils.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID); ThreadUtils.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID);
} }
} catch (InvalidClassException e) { } catch (InvalidClassException e) {
log.error(e.getMessage()); reportInvalidRequest(RuleViolation.INVALID_CLASS, e.getMessage());
e.printStackTrace();
reportInvalidRequest(RuleViolation.INVALID_CLASS);
} catch (ProtobufferException | NoClassDefFoundError | InvalidProtocolBufferException e) { } catch (ProtobufferException | NoClassDefFoundError | InvalidProtocolBufferException e) {
log.error(e.getMessage()); reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE, e.getMessage());
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
} catch (Throwable t) { } catch (Throwable t) {
handleException(t); handleException(t);
} }
@ -943,4 +936,16 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
NodeAddress nodeAddress = getSenderNodeAddress(networkEnvelope); NodeAddress nodeAddress = getSenderNodeAddress(networkEnvelope);
return nodeAddress == null ? "null" : nodeAddress.getFullAddress(); return nodeAddress == null ? "null" : nodeAddress.getFullAddress();
} }
private synchronized void throttleWarn(String msg) {
boolean logWarning = System.currentTimeMillis() - lastLoggedWarningTs > LOG_THROTTLE_INTERVAL_MS;
if (logWarning) {
log.warn(msg);
if (numUnloggedWarnings > 0) log.warn("We received {} other log warnings since the last log entry", numUnloggedWarnings);
numUnloggedWarnings = 0;
lastLoggedWarningTs = System.currentTimeMillis();
} else {
numUnloggedWarnings++;
}
}
} }

View file

@ -381,7 +381,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
// If a node is trying to send too many list we treat it as rule violation. // If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom. // Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much // Will trigger a shutdown after 2nd time sending too much
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT); connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT, "Too many reported peers sent");
} }
} }