From 9d3855ad2eefd457c9465a4dee252e1843d1a9d7 Mon Sep 17 00:00:00 2001 From: woodser Date: Fri, 30 Dec 2022 13:45:21 +0000 Subject: [PATCH] synchronize open offers to fix concurrent modification exception --- .../java/bisq/core/api/CoreOffersService.java | 2 +- .../bisq/core/offer/OpenOfferManager.java | 88 +++++++++++++------ 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/bisq/core/api/CoreOffersService.java b/core/src/main/java/bisq/core/api/CoreOffersService.java index 30763dfe..d11688f1 100644 --- a/core/src/main/java/bisq/core/api/CoreOffersService.java +++ b/core/src/main/java/bisq/core/api/CoreOffersService.java @@ -128,7 +128,7 @@ public class CoreOffersService { } List getMyOffers() { - List offers = new ArrayList<>(openOfferManager.getObservableList()).stream() + List offers = openOfferManager.getOpenOffers().stream() .map(OpenOffer::getOffer) .filter(o -> o.isMyOffer(keyRing)) .collect(Collectors.toList()); diff --git a/core/src/main/java/bisq/core/offer/OpenOfferManager.java b/core/src/main/java/bisq/core/offer/OpenOfferManager.java index 7e45ec19..cee11ebd 100644 --- a/core/src/main/java/bisq/core/offer/OpenOfferManager.java +++ b/core/src/main/java/bisq/core/offer/OpenOfferManager.java @@ -444,8 +444,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe OpenOffer openOffer = new OpenOffer(offer, triggerPrice, autoSplit); // process open offer to schedule or post - processUnpostedOffer(openOffer, (transaction) -> { - openOffers.add(openOffer); + processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> { + addOpenOffer(openOffer); requestPersistence(); resultHandler.handleResult(transaction); }, (errMessage) -> { @@ -550,12 +550,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe openOffer.getOffer().setState(Offer.State.REMOVED); openOffer.setState(OpenOffer.State.CANCELED); - openOffers.remove(openOffer); + removeOpenOffer(openOffer); OpenOffer editedOpenOffer = new OpenOffer(editedOffer, triggerPrice); editedOpenOffer.setState(originalState); - openOffers.add(editedOpenOffer); + addOpenOffer(editedOpenOffer); if (!editedOpenOffer.isDeactivated()) republishOffer(editedOpenOffer); @@ -592,7 +592,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } offer.setState(Offer.State.REMOVED); openOffer.setState(OpenOffer.State.CANCELED); - openOffers.remove(openOffer); + removeOpenOffer(openOffer); closedTradableManager.add(openOffer); xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId()); log.info("onRemoved offerId={}", offer.getId()); @@ -602,7 +602,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // Close openOffer after deposit published public void closeOpenOffer(Offer offer) { getOpenOfferById(offer.getId()).ifPresent(openOffer -> { - openOffers.remove(openOffer); + removeOpenOffer(openOffer); openOffer.setState(OpenOffer.State.CLOSED); xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId()); offerBookService.removeOffer(openOffer.getOffer().getOfferPayload(), @@ -630,16 +630,50 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe return offer.isMyOffer(keyRing); } + public List getOpenOffers() { + synchronized (openOffers) { + return new ArrayList<>(getObservableList()); + } + } + + public List getSignedOffers() { + synchronized (signedOffers) { + return new ArrayList<>(signedOffers.getObservableList()); + } + } + public ObservableList getObservableList() { return openOffers.getObservableList(); } public Optional getOpenOfferById(String offerId) { - return new ArrayList<>(openOffers.getObservableList()).stream().filter(e -> e.getId().equals(offerId)).findFirst(); + synchronized (openOffers) { + return openOffers.stream().filter(e -> e.getId().equals(offerId)).findFirst(); + } } public Optional getSignedOfferById(String offerId) { - return new ArrayList<>(signedOffers.getObservableList()).stream().filter(e -> e.getOfferId().equals(offerId)).findFirst(); + synchronized (signedOffers) { + return signedOffers.stream().filter(e -> e.getOfferId().equals(offerId)).findFirst(); + } + } + + private void addOpenOffer(OpenOffer openOffer) { + synchronized (openOffers) { + openOffers.add(openOffer); + } + } + + private void removeOpenOffer(OpenOffer openOffer) { + synchronized (openOffers) { + openOffers.remove(openOffer); + } + } + + private void addSignedOffer(SignedOffer openOffer) { + synchronized (signedOffers) { + signedOffers.add(openOffer); + } } /////////////////////////////////////////////////////////////////////////////////////////// @@ -650,10 +684,11 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe ErrorMessageHandler errorMessageHandler) { new Thread(() -> { List errorMessages = new ArrayList(); - for (OpenOffer scheduledOffer : new ArrayList(openOffers.getObservableList())) { + List openOffers = getOpenOffers(); + for (OpenOffer scheduledOffer : openOffers) { if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; CountDownLatch latch = new CountDownLatch(1); - processUnpostedOffer(scheduledOffer, (transaction) -> { + processUnpostedOffer(openOffers, scheduledOffer, (transaction) -> { latch.countDown(); }, errorMessage -> { onRemoved(scheduledOffer); @@ -668,7 +703,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe }).start(); } - private void processUnpostedOffer(OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { + private void processUnpostedOffer(List openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { new Thread(() -> { try { @@ -703,7 +738,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe log.info("Scheduling offer " + openOffer.getId()); // check for sufficient balance - scheduled offers amount - if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) { + if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount(openOffers)).compareTo(offerReserveAmount) < 0) { throw new RuntimeException("Not enough money in Haveno wallet"); } @@ -714,7 +749,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe List scheduledTxHashes = new ArrayList(); BigInteger scheduledAmount = new BigInteger("0"); for (MoneroTxWallet lockedTx : lockedTxs) { - if (isTxScheduled(lockedTx.getHash())) continue; + if (isTxScheduled(openOffers, lockedTx.getHash())) continue; if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue; scheduledTxHashes.add(lockedTx.getHash()); for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) { @@ -739,9 +774,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe }).start(); } - private BigInteger getScheduledAmount() { + private BigInteger getScheduledAmount(List openOffers) { BigInteger scheduledAmount = new BigInteger("0"); - for (OpenOffer openOffer : openOffers.getObservableList()) { + for (OpenOffer openOffer : openOffers) { if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue; if (openOffer.getScheduledTxHashes() == null) continue; List fundingTxs = xmrWalletService.getWallet().getTxs(openOffer.getScheduledTxHashes()); @@ -754,8 +789,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe return scheduledAmount; } - private boolean isTxScheduled(String txHash) { - for (OpenOffer openOffer : openOffers.getObservableList()) { + private boolean isTxScheduled(List openOffers, String txHash) { + for (OpenOffer openOffer : openOffers) { if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue; if (openOffer.getScheduledTxHashes() == null) continue; for (String scheduledTxHash : openOffer.getScheduledTxHashes()) { @@ -888,7 +923,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // create record of signed offer SignedOffer signedOffer = new SignedOffer(signedOfferPayload.getId(), request.getReserveTxHash(), request.getReserveTxHex(), signature); // TODO (woodser): no need for signature to be part of SignedOffer? - signedOffers.add(signedOffer); + addSignedOffer(signedOffer); requestPersistence(); // send response with signature @@ -1133,7 +1168,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private void maybeUpdatePersistedOffers() { // We need to clone to avoid ConcurrentModificationException - ArrayList openOffersClone = new ArrayList<>(openOffers.getList()); + List openOffersClone = getOpenOffers(); openOffersClone.forEach(originalOpenOffer -> { Offer originalOffer = originalOpenOffer.getOffer(); @@ -1227,7 +1262,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // remove old offer originalOffer.setState(Offer.State.REMOVED); originalOpenOffer.setState(OpenOffer.State.CANCELED); - openOffers.remove(originalOpenOffer); + removeOpenOffer(originalOpenOffer); // Create new Offer Offer updatedOffer = new Offer(updatedPayload); @@ -1236,7 +1271,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, originalOpenOffer.getTriggerPrice()); updatedOpenOffer.setState(originalOpenOfferState); - openOffers.add(updatedOpenOffer); + addOpenOffer(updatedOpenOffer); requestPersistence(); log.info("Updating offer completed. id={}", originalOffer.getId()); @@ -1256,8 +1291,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopPeriodicRefreshOffersTimer(); - List openOffersList = new ArrayList<>(openOffers.getList()); - processListForRepublishOffers(openOffersList); + processListForRepublishOffers(getOpenOffers()); } private void processListForRepublishOffers(List list) { @@ -1266,13 +1300,17 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } OpenOffer openOffer = list.remove(0); - if (openOffers.contains(openOffer) && !openOffer.isDeactivated()) { + boolean contained = false; + synchronized (openOffers) { + contained = openOffers.contains(openOffer); + } + if (contained && !openOffer.isDeactivated()) { // TODO It is not clear yet if it is better for the node and the network to send out all add offer // messages in one go or to spread it over a delay. With power users who have 100-200 offers that can have // some significant impact to user experience and the network republishOffer(openOffer, () -> processListForRepublishOffers(list)); - /* republishOffer(openOffer, + /* republishOffer(openOffer, () -> UserThread.runAfter(() -> processListForRepublishOffers(list), 30, TimeUnit.MILLISECONDS));*/ } else {