fix concurrent modification exception in disputes list

This commit is contained in:
woodser 2024-08-05 12:09:17 -04:00
parent d9a3feba8d
commit 3dfaa2fc52
7 changed files with 80 additions and 58 deletions

View file

@ -275,10 +275,12 @@ public class CoreDisputesService {
disputeResult.summaryNotesProperty().get() disputeResult.summaryNotesProperty().get()
); );
if (reason == DisputeResult.Reason.OPTION_TRADE && synchronized (dispute.getChatMessages()) {
if (reason == DisputeResult.Reason.OPTION_TRADE &&
dispute.getChatMessages().size() > 1 && dispute.getChatMessages().size() > 1 &&
dispute.getChatMessages().get(1).isSystemMessage()) { dispute.getChatMessages().get(1).isSystemMessage()) {
textToSign += "\n" + dispute.getChatMessages().get(1).getMessage() + "\n"; textToSign += "\n" + dispute.getChatMessages().get(1).getMessage() + "\n";
}
} }
String summaryText = DisputeSummaryVerification.signAndApply(disputeManager, disputeResult, textToSign); String summaryText = DisputeSummaryVerification.signAndApply(disputeManager, disputeResult, textToSign);

View file

@ -114,15 +114,17 @@ public class DisputeMsgEvents {
// We check at every new message if it might be a message sent after the dispute had been closed. If that is the // We check at every new message if it might be a message sent after the dispute had been closed. If that is the
// case we revert the isClosed flag so that the UI can reopen the dispute and indicate that a new dispute // case we revert the isClosed flag so that the UI can reopen the dispute and indicate that a new dispute
// message arrived. // message arrived.
ObservableList<ChatMessage> chatMessages = dispute.getChatMessages(); synchronized (dispute.getChatMessages()) {
// If last message is not a result message we re-open as we might have received a new message from the ObservableList<ChatMessage> chatMessages = dispute.getChatMessages();
// trader/mediator/arbitrator who has reopened the case // If last message is not a result message we re-open as we might have received a new message from the
if (dispute.isClosed() && !chatMessages.isEmpty() && !chatMessages.get(chatMessages.size() - 1).isResultMessage(dispute)) { // trader/mediator/arbitrator who has reopened the case
dispute.reOpen(); if (dispute.isClosed() && !chatMessages.isEmpty() && !chatMessages.get(chatMessages.size() - 1).isResultMessage(dispute)) {
if (dispute.getSupportType() == SupportType.MEDIATION) { dispute.reOpen();
mediationManager.requestPersistence(); if (dispute.getSupportType() == SupportType.MEDIATION) {
} else if (dispute.getSupportType() == SupportType.REFUND) { mediationManager.requestPersistence();
refundManager.requestPersistence(); } else if (dispute.getSupportType() == SupportType.REFUND) {
refundManager.requestPersistence();
}
} }
} }
} }

View file

@ -192,17 +192,19 @@ public abstract class SupportManager {
if (ackMessage.getSourceMsgClassName().equals(ChatMessage.class.getSimpleName())) { if (ackMessage.getSourceMsgClassName().equals(ChatMessage.class.getSimpleName())) {
Trade trade = tradeManager.getTrade(ackMessage.getSourceId()); Trade trade = tradeManager.getTrade(ackMessage.getSourceId());
for (Dispute dispute : trade.getDisputes()) { for (Dispute dispute : trade.getDisputes()) {
for (ChatMessage chatMessage : dispute.getChatMessages()) { synchronized (dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) { for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_REQUESTED) { if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
if (dispute.isClosed()) dispute.reOpen(); if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_REQUESTED) {
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); if (dispute.isClosed()) dispute.reOpen();
} else if (dispute.isClosed()) { trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
trade.pollWalletNormallyForMs(30000); // sync to check for payout } else if (dispute.isClosed()) {
trade.pollWalletNormallyForMs(30000); // sync to check for payout
}
}
} }
} }
} }
}
} }
} else { } else {
log.warn("Received AckMessage with error state for {} with tradeId={}, sender={}, errorMessage={}", log.warn("Received AckMessage with error state for {} with tradeId={}, sender={}, errorMessage={}",
@ -212,12 +214,14 @@ public abstract class SupportManager {
if (ackMessage.getSourceMsgClassName().equals(ChatMessage.class.getSimpleName())) { if (ackMessage.getSourceMsgClassName().equals(ChatMessage.class.getSimpleName())) {
Trade trade = tradeManager.getTrade(ackMessage.getSourceId()); Trade trade = tradeManager.getTrade(ackMessage.getSourceId());
for (Dispute dispute : trade.getDisputes()) { for (Dispute dispute : trade.getDisputes()) {
for (ChatMessage chatMessage : dispute.getChatMessages()) { synchronized (dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) { for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (trade.getDisputeState().isCloseRequested()) { if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
log.warn("DisputeCloseMessage was nacked. We close the dispute now. tradeId={}, nack sender={}", trade.getId(), ackMessage.getSenderNodeAddress()); if (trade.getDisputeState().isCloseRequested()) {
dispute.setIsClosed(); log.warn("DisputeCloseMessage was nacked. We close the dispute now. tradeId={}, nack sender={}", trade.getId(), ackMessage.getSenderNodeAddress());
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_CLOSED); dispute.setIsClosed();
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_CLOSED);
}
} }
} }
} }

View file

@ -353,10 +353,12 @@ public final class Dispute implements NetworkPayload, PersistablePayload {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void addAndPersistChatMessage(ChatMessage chatMessage) { public void addAndPersistChatMessage(ChatMessage chatMessage) {
if (!chatMessages.contains(chatMessage)) { synchronized (chatMessages) {
chatMessages.add(chatMessage); if (!chatMessages.contains(chatMessage)) {
} else { chatMessages.add(chatMessage);
log.error("disputeDirectMessage already exists"); } else {
log.error("disputeDirectMessage already exists");
}
} }
} }
@ -365,13 +367,15 @@ public final class Dispute implements NetworkPayload, PersistablePayload {
} }
public boolean removeAllChatMessages() { public boolean removeAllChatMessages() {
if (chatMessages.size() > 1) { synchronized (chatMessages) {
// removes all chat except the initial guidelines message. if (chatMessages.size() > 1) {
String firstMessageUid = chatMessages.get(0).getUid(); // removes all chat except the initial guidelines message.
chatMessages.removeIf((msg) -> !msg.getUid().equals(firstMessageUid)); String firstMessageUid = chatMessages.get(0).getUid();
return true; chatMessages.removeIf((msg) -> !msg.getUid().equals(firstMessageUid));
return true;
}
return false;
} }
return false;
} }
public void maybeClearSensitiveData() { public void maybeClearSensitiveData() {

View file

@ -90,12 +90,14 @@ public abstract class DisputeListService<T extends DisputeList<Dispute>> impleme
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void cleanupDisputes(@Nullable Consumer<String> closedDisputeHandler) { public void cleanupDisputes(@Nullable Consumer<String> closedDisputeHandler) {
disputeList.stream().forEach(dispute -> { synchronized (disputeList.getObservableList()) {
String tradeId = dispute.getTradeId(); disputeList.stream().forEach(dispute -> {
if (dispute.isClosed() && closedDisputeHandler != null) { String tradeId = dispute.getTradeId();
closedDisputeHandler.accept(tradeId); if (dispute.isClosed() && closedDisputeHandler != null) {
} closedDisputeHandler.accept(tradeId);
}); }
});
}
} }
@ -130,7 +132,9 @@ public abstract class DisputeListService<T extends DisputeList<Dispute>> impleme
} }
ObservableList<Dispute> getObservableList() { ObservableList<Dispute> getObservableList() {
return disputeList.getObservableList(); synchronized (disputeList.getObservableList()) {
return disputeList.getObservableList();
}
} }
@ -151,10 +155,12 @@ public abstract class DisputeListService<T extends DisputeList<Dispute>> impleme
isAlerting -> { isAlerting -> {
// We get the event before the list gets updated, so we execute on next frame // We get the event before the list gets updated, so we execute on next frame
UserThread.execute(() -> { UserThread.execute(() -> {
int numAlerts = (int) disputeList.getList().stream() synchronized (disputeList.getObservableList()) {
.mapToLong(x -> x.getBadgeCountProperty().getValue()) int numAlerts = (int) disputeList.getList().stream()
.sum(); .mapToLong(x -> x.getBadgeCountProperty().getValue())
numOpenDisputes.set(numAlerts); .sum();
numOpenDisputes.set(numAlerts);
}
}); });
}); });
disputedTradeIds.add(dispute.getTradeId()); disputedTradeIds.add(dispute.getTradeId());

View file

@ -187,10 +187,12 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
@Override @Override
public List<ChatMessage> getAllChatMessages(String tradeId) { public List<ChatMessage> getAllChatMessages(String tradeId) {
return getDisputeList().stream() synchronized (getDisputeList().getObservableList()) {
.filter(dispute -> dispute.getTradeId().equals(tradeId)) return getDisputeList().stream()
.flatMap(dispute -> dispute.getChatMessages().stream()) .filter(dispute -> dispute.getTradeId().equals(tradeId))
.collect(Collectors.toList()); .flatMap(dispute -> dispute.getChatMessages().stream())
.collect(Collectors.toList());
}
} }
@Override @Override
@ -239,7 +241,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
} }
public ObservableList<Dispute> getDisputesAsObservableList() { public ObservableList<Dispute> getDisputesAsObservableList() {
synchronized(disputeListService.getDisputeList()) { synchronized(disputeListService.getDisputeList().getObservableList()) {
return disputeListService.getObservableList(); return disputeListService.getObservableList();
} }
} }
@ -249,7 +251,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
} }
protected T getDisputeList() { protected T getDisputeList() {
synchronized(disputeListService.getDisputeList()) { synchronized(disputeListService.getDisputeList().getObservableList()) {
return disputeListService.getDisputeList(); return disputeListService.getDisputeList();
} }
} }
@ -354,7 +356,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
return; return;
} }
synchronized (disputeList) { synchronized (disputeList.getObservableList()) {
if (disputeList.contains(dispute)) { if (disputeList.contains(dispute)) {
String msg = "We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId() + ", DisputeId = " + dispute.getId(); String msg = "We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId() + ", DisputeId = " + dispute.getId();
log.warn(msg); log.warn(msg);

View file

@ -287,10 +287,12 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
// set dispute state // set dispute state
cleanupRetryMap(uid); cleanupRetryMap(uid);
if (!dispute.getChatMessages().contains(chatMessage)) { synchronized (dispute.getChatMessages()) {
dispute.addAndPersistChatMessage(chatMessage); if (!dispute.getChatMessages().contains(chatMessage)) {
} else { dispute.addAndPersistChatMessage(chatMessage);
log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId()); } else {
log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId());
}
} }
dispute.setIsClosed(); dispute.setIsClosed();
if (dispute.disputeResultProperty().get() != null) { if (dispute.disputeResultProperty().get() != null) {