From 870630b381fe62a5ab68a466ad4f702c723c3b46 Mon Sep 17 00:00:00 2001 From: woodser Date: Wed, 31 May 2023 08:05:21 -0400 Subject: [PATCH] synchronize P2PDataStorage to avoid race conditions --- .../network/p2p/storage/P2PDataStorage.java | 461 +++++++++--------- 1 file changed, 239 insertions(+), 222 deletions(-) diff --git a/p2p/src/main/java/haveno/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/haveno/network/p2p/storage/P2PDataStorage.java index 36dd93ee..45017c99 100644 --- a/p2p/src/main/java/haveno/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/haveno/network/p2p/storage/P2PDataStorage.java @@ -232,8 +232,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers appendOnlyDataStoreService.readFromResources(postFix, () -> appendOnlyDataStoreServiceReady.set(true)); protectedDataStoreService.readFromResources(postFix, () -> { - map.putAll(protectedDataStoreService.getMap()); - protectedDataStoreServiceReady.set(true); + synchronized (map) { + map.putAll(protectedDataStoreService.getMap()); + protectedDataStoreServiceReady.set(true); + } }); resourceDataStoreService.readFromResources(postFix, () -> resourceDataStoreServiceReady.set(true)); } @@ -241,20 +243,24 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers // Uses synchronous execution on the userThread. Only used by tests. The async methods should be used by app code. @VisibleForTesting public void readFromResourcesSync(String postFix) { - appendOnlyDataStoreService.readFromResourcesSync(postFix); - protectedDataStoreService.readFromResourcesSync(postFix); - resourceDataStoreService.readFromResourcesSync(postFix); - - map.putAll(protectedDataStoreService.getMap()); + synchronized (map) { + appendOnlyDataStoreService.readFromResourcesSync(postFix); + protectedDataStoreService.readFromResourcesSync(postFix); + resourceDataStoreService.readFromResourcesSync(postFix); + + map.putAll(protectedDataStoreService.getMap()); + } } // We get added mailbox message data from MailboxMessageService. We want to add those early so we can get it added // to our excluded keys to reduce initial data response data size. public void addProtectedMailboxStorageEntryToMap(ProtectedStorageEntry protectedStorageEntry) { - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - map.put(hashOfPayload, protectedStorageEntry); - //log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap()); + synchronized (map) { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + map.put(hashOfPayload, protectedStorageEntry); + //log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap()); + } } /////////////////////////////////////////////////////////////////////////////////////////// @@ -627,28 +633,30 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers @VisibleForTesting void removeExpiredEntries() { - // The moment when an object becomes expired will not be synchronous in the network and we could - // get add network_messages after the object has expired. To avoid repeated additions of already expired - // object when we get it sent from new peers, we don’t remove the sequence number from the map. - // That way an ADD message for an already expired data will fail because the sequence number - // is equal and not larger as expected. - ArrayList> toRemoveList = map.entrySet().stream() - .filter(entry -> entry.getValue().isExpired(this.clock)) - .collect(Collectors.toCollection(ArrayList::new)); + synchronized (map) { + // The moment when an object becomes expired will not be synchronous in the network and we could + // get add network_messages after the object has expired. To avoid repeated additions of already expired + // object when we get it sent from new peers, we don’t remove the sequence number from the map. + // That way an ADD message for an already expired data will fail because the sequence number + // is equal and not larger as expected. + ArrayList> toRemoveList = map.entrySet().stream() + .filter(entry -> entry.getValue().isExpired(this.clock)) + .collect(Collectors.toCollection(ArrayList::new)); - // Batch processing can cause performance issues, so do all of the removes first, then update the listeners - // to let them know about the removes. - if (log.isDebugEnabled()) { - toRemoveList.forEach(toRemoveItem -> { - log.debug("We found an expired data entry. We remove the protectedData:\n\t{}", - Utilities.toTruncatedString(toRemoveItem.getValue())); - }); - } - removeFromMapAndDataStore(toRemoveList); + // Batch processing can cause performance issues, so do all of the removes first, then update the listeners + // to let them know about the removes. + if (log.isDebugEnabled()) { + toRemoveList.forEach(toRemoveItem -> { + log.debug("We found an expired data entry. We remove the protectedData:\n\t{}", + Utilities.toTruncatedString(toRemoveItem.getValue())); + }); + } + removeFromMapAndDataStore(toRemoveList); - if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) { - sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap())); - requestPersistence(); + if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) { + sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap())); + requestPersistence(); + } } } @@ -699,22 +707,24 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers NodeAddress peersNodeAddress = connection.getPeersNodeAddressOptional().get(); // Backdate all the eligible payloads based on the node that disconnected - map.values().stream() - .filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload) - .filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress)) - .forEach(protectedStorageEntry -> { - // We only set the data back by half of the TTL and remove the data only if is has - // expired after that back dating. - // We might get connection drops which are not caused by the node going offline, so - // we give more tolerance with that approach, giving the node the chance to - // refresh the TTL with a refresh message. - // We observed those issues during stress tests, but it might have been caused by the - // test set up (many nodes/connections over 1 router) - // TODO investigate what causes the disconnections. - // Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException) - log.debug("Backdating {} due to closeConnectionReason={}", protectedStorageEntry, closeConnectionReason); - protectedStorageEntry.backDate(); - }); + synchronized (map) { + map.values().stream() + .filter(protectedStorageEntry -> protectedStorageEntry.getProtectedStoragePayload() instanceof RequiresOwnerIsOnlinePayload) + .filter(protectedStorageEntry -> ((RequiresOwnerIsOnlinePayload) protectedStorageEntry.getProtectedStoragePayload()).getOwnerNodeAddress().equals(peersNodeAddress)) + .forEach(protectedStorageEntry -> { + // We only set the data back by half of the TTL and remove the data only if is has + // expired after that back dating. + // We might get connection drops which are not caused by the node going offline, so + // we give more tolerance with that approach, giving the node the chance to + // refresh the TTL with a refresh message. + // We observed those issues during stress tests, but it might have been caused by the + // test set up (many nodes/connections over 1 router) + // TODO investigate what causes the disconnections. + // Usually the are: SOCKET_TIMEOUT ,TERMINATED (EOFException) + log.debug("Backdating {} due to closeConnectionReason={}", protectedStorageEntry, closeConnectionReason); + protectedStorageEntry.backDate(); + }); + } } /////////////////////////////////////////////////////////////////////////////////////////// @@ -818,81 +828,83 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, boolean allowBroadcast) { - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - - //log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap()); - - // We do that check early as it is a very common case for returning, so we return early - // If we have seen a more recent operation for this payload and we have a payload locally, ignore it - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - if (storedEntry != null && !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) { - log.trace("## hasSequenceNrIncreased is false. hash={}", hashOfPayload); - return false; + synchronized (map) { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + + //log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap()); + + // We do that check early as it is a very common case for returning, so we return early + // If we have seen a more recent operation for this payload and we have a payload locally, ignore it + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + if (storedEntry != null && !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) { + log.trace("## hasSequenceNrIncreased is false. hash={}", hashOfPayload); + return false; + } + + if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) { + log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " + + "We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString()); + return false; + } + + // To avoid that expired data get stored and broadcast we check for expire date. + if (protectedStorageEntry.isExpired(clock)) { + String peer = sender != null ? sender.getFullAddress() : "sender is null"; + log.trace("## We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}", + peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName()); + return false; + } + + // We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is + // the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup. + MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload); + if (sequenceNumberMapValue != null && + protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) { + log.trace("## sequenceNr too low hash={}", hashOfPayload); + return false; + } + + // Verify the ProtectedStorageEntry is well formed and valid for the add operation + if (!protectedStorageEntry.isValidForAddOperation()) { + log.trace("## !isValidForAddOperation hash={}", hashOfPayload); + return false; + } + + // If we have already seen an Entry with the same hash, verify the metadata is equal + if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) { + log.trace("## !matchesRelevantPubKey hash={}", hashOfPayload); + return false; + } + + // Test against filterPredicate set from FilterManager + if (filterPredicate != null && + !filterPredicate.test(protectedStorageEntry.getProtectedStoragePayload())) { + log.debug("filterPredicate test failed. hashOfPayload={}", hashOfPayload); + return false; + } + + // This is an updated entry. Record it and signal listeners. + map.put(hashOfPayload, protectedStorageEntry); + hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry))); + + // Record the updated sequence number and persist it. Higher delay so we can batch more items. + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); + requestPersistence(); + + //log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap()); + + // Optionally, broadcast the add/update depending on the calling environment + if (allowBroadcast) { + broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener); + log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload); + } + // Persist ProtectedStorageEntries carrying PersistablePayload payloads + if (protectedStoragePayload instanceof PersistablePayload) + protectedDataStoreService.put(hashOfPayload, protectedStorageEntry); + + return true; } - - if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) { - log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " + - "We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString()); - return false; - } - - // To avoid that expired data get stored and broadcast we check for expire date. - if (protectedStorageEntry.isExpired(clock)) { - String peer = sender != null ? sender.getFullAddress() : "sender is null"; - log.trace("## We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}", - peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName()); - return false; - } - - // We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is - // the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup. - MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload); - if (sequenceNumberMapValue != null && - protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) { - log.trace("## sequenceNr too low hash={}", hashOfPayload); - return false; - } - - // Verify the ProtectedStorageEntry is well formed and valid for the add operation - if (!protectedStorageEntry.isValidForAddOperation()) { - log.trace("## !isValidForAddOperation hash={}", hashOfPayload); - return false; - } - - // If we have already seen an Entry with the same hash, verify the metadata is equal - if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) { - log.trace("## !matchesRelevantPubKey hash={}", hashOfPayload); - return false; - } - - // Test against filterPredicate set from FilterManager - if (filterPredicate != null && - !filterPredicate.test(protectedStorageEntry.getProtectedStoragePayload())) { - log.debug("filterPredicate test failed. hashOfPayload={}", hashOfPayload); - return false; - } - - // This is an updated entry. Record it and signal listeners. - map.put(hashOfPayload, protectedStorageEntry); - hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry))); - - // Record the updated sequence number and persist it. Higher delay so we can batch more items. - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); - requestPersistence(); - - //log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap()); - - // Optionally, broadcast the add/update depending on the calling environment - if (allowBroadcast) { - broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener); - log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload); - } - // Persist ProtectedStorageEntries carrying PersistablePayload payloads - if (protectedStoragePayload instanceof PersistablePayload) - protectedDataStoreService.put(hashOfPayload, protectedStorageEntry); - - return true; } /** @@ -935,50 +947,51 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers */ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeAddress sender) { - - try { - ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload()); - ProtectedStorageEntry storedData = map.get(hashOfPayload); - - if (storedData == null) { - log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing."); - + synchronized (map) { + try { + ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload()); + ProtectedStorageEntry storedData = map.get(hashOfPayload); + + if (storedData == null) { + log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing."); + + return false; + } + + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry( + storedEntry.getProtectedStoragePayload(), + storedEntry.getOwnerPubKey(), + refreshTTLMessage.getSequenceNumber(), + refreshTTLMessage.getSignature(), + this.clock); + + + // If we have seen a more recent operation for this payload, we ignore the current one + if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload)) + return false; + + // Verify the updated ProtectedStorageEntry is well formed and valid for update + if (!updatedEntry.isValidForAddOperation()) + return false; + + // Update the hash map with the updated entry + map.put(hashOfPayload, updatedEntry); + + // Record the latest sequence number and persist it + sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis())); + requestPersistence(); + + // Always broadcast refreshes + broadcaster.broadcast(refreshTTLMessage, sender); + + } catch (IllegalArgumentException e) { + log.error("refreshTTL failed, missing data: {}", e.toString()); + e.printStackTrace(); return false; } - - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - ProtectedStorageEntry updatedEntry = new ProtectedStorageEntry( - storedEntry.getProtectedStoragePayload(), - storedEntry.getOwnerPubKey(), - refreshTTLMessage.getSequenceNumber(), - refreshTTLMessage.getSignature(), - this.clock); - - - // If we have seen a more recent operation for this payload, we ignore the current one - if (!hasSequenceNrIncreased(updatedEntry.getSequenceNumber(), hashOfPayload)) - return false; - - // Verify the updated ProtectedStorageEntry is well formed and valid for update - if (!updatedEntry.isValidForAddOperation()) - return false; - - // Update the hash map with the updated entry - map.put(hashOfPayload, updatedEntry); - - // Record the latest sequence number and persist it - sequenceNumberMap.put(hashOfPayload, new MapValue(updatedEntry.getSequenceNumber(), this.clock.millis())); - requestPersistence(); - - // Always broadcast refreshes - broadcaster.broadcast(refreshTTLMessage, sender); - - } catch (IllegalArgumentException e) { - log.error("refreshTTL failed, missing data: {}", e.toString()); - e.printStackTrace(); - return false; + return true; } - return true; } /** @@ -991,48 +1004,50 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers */ public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender) { - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - - // If we have seen a more recent operation for this payload, ignore this one - if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) - return false; - - // Verify the ProtectedStorageEntry is well formed and valid for the remove operation - if (!protectedStorageEntry.isValidForRemoveOperation()) - return false; - - // If we have already seen an Entry with the same hash, verify the metadata is the same - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) - return false; - - // Record the latest sequence number and persist it - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); - requestPersistence(); - - // Update that we have seen this AddOncePayload so the next time it is seen it fails verification - if (protectedStoragePayload instanceof AddOncePayload) { - removedPayloadsService.addHash(hashOfPayload); + synchronized (map) { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + + // If we have seen a more recent operation for this payload, ignore this one + if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) + return false; + + // Verify the ProtectedStorageEntry is well formed and valid for the remove operation + if (!protectedStorageEntry.isValidForRemoveOperation()) + return false; + + // If we have already seen an Entry with the same hash, verify the metadata is the same + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) + return false; + + // Record the latest sequence number and persist it + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); + requestPersistence(); + + // Update that we have seen this AddOncePayload so the next time it is seen it fails verification + if (protectedStoragePayload instanceof AddOncePayload) { + removedPayloadsService.addHash(hashOfPayload); + } + + if (storedEntry != null) { + // Valid remove entry, do the remove and signal listeners + removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload); + } /* else { + // This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated + // the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to + // broadcast the remove to peers so they can update their state appropriately + } */ + printData("after remove"); + + if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) { + broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender); + } else { + broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender); + } + + return true; } - - if (storedEntry != null) { - // Valid remove entry, do the remove and signal listeners - removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload); - } /* else { - // This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated - // the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to - // broadcast the remove to peers so they can update their state appropriately - } */ - printData("after remove"); - - if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) { - broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender); - } else { - broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender); - } - - return true; } public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload, @@ -1107,30 +1122,32 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers } private void removeFromMapAndDataStore(Collection> entriesToRemove) { - if (entriesToRemove.isEmpty()) - return; + synchronized (map) { + if (entriesToRemove.isEmpty()) + return; - List removedProtectedStorageEntries = new ArrayList<>(entriesToRemove.size()); - entriesToRemove.forEach(entry -> { - ByteArray hashOfPayload = entry.getKey(); - ProtectedStorageEntry protectedStorageEntry = entry.getValue(); + List removedProtectedStorageEntries = new ArrayList<>(entriesToRemove.size()); + entriesToRemove.forEach(entry -> { + ByteArray hashOfPayload = entry.getKey(); + ProtectedStorageEntry protectedStorageEntry = entry.getValue(); - //log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap()); - map.remove(hashOfPayload); - //log.trace("## removeFromMapAndDataStore: map after remove={}", printMap()); + //log.trace("## removeFromMapAndDataStore: hashOfPayload={}, map before remove={}", hashOfPayload, printMap()); + map.remove(hashOfPayload); + //log.trace("## removeFromMapAndDataStore: map after remove={}", printMap()); - // We inform listeners even the entry was not found in our map - removedProtectedStorageEntries.add(protectedStorageEntry); + // We inform listeners even the entry was not found in our map + removedProtectedStorageEntries.add(protectedStorageEntry); - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof PersistablePayload) { - ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry); - if (previous == null) - log.warn("We cannot remove the protectedStorageEntry from the protectedDataStoreService as it does not exist."); - } - }); + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof PersistablePayload) { + ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry); + if (previous == null) + log.warn("We cannot remove the protectedStorageEntry from the protectedDataStoreService as it does not exist."); + } + }); - hashMapChangedListeners.forEach(e -> e.onRemoved(removedProtectedStorageEntries)); + hashMapChangedListeners.forEach(e -> e.onRemoved(removedProtectedStorageEntries)); + } } private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfData) {