From a088f685c1d8a0f94a8ff71ff4ab2f20ddbdec34 Mon Sep 17 00:00:00 2001 From: woodser Date: Thu, 27 Jul 2023 08:02:29 -0400 Subject: [PATCH] process unposted offers within lock --- .../haveno/core/offer/OpenOfferManager.java | 188 +++++++++--------- 1 file changed, 99 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index 8181b9b6..232176d5 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -152,6 +152,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private static final long KEY_IMAGE_REFRESH_PERIOD_MS_LOCAL = 20000; // 20 seconds private static final long KEY_IMAGE_REFRESH_PERIOD_MS_REMOTE = 300000; // 5 minutes + private Object processOffersLock = new Object(); // lock for processing offers + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor, Initialization @@ -423,8 +425,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService) // .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg)))); - // process unposted offers - processUnpostedOffers((transaction) -> {}, (errorMessage) -> { + // process scheduled offers + processScheduledOffers((transaction) -> {}, (errorMessage) -> { log.warn("Error processing unposted offers: " + errorMessage); }); @@ -434,7 +436,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe @Override public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) { - processUnpostedOffers((transaction) -> {}, (errorMessage) -> { + processScheduledOffers((transaction) -> {}, (errorMessage) -> { log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post }); } @@ -498,17 +500,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // create open offer OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount); - // process open offer to schedule or post - processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> { - addOpenOffer(openOffer); - requestPersistence(); - resultHandler.handleResult(transaction); - }, (errorMessage) -> { - log.warn("Error processing unposted offer {}: {}", openOffer.getId(), errorMessage); - onCancelled(openOffer); - offer.setErrorMessage(errorMessage); - errorMessageHandler.handleErrorMessage(errorMessage); - }); + // schedule or post offer + new Thread(() -> { + synchronized (processOffersLock) { + CountDownLatch latch = new CountDownLatch(1); + processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> { + addOpenOffer(openOffer); + requestPersistence(); + latch.countDown(); + resultHandler.handleResult(transaction); + }, (errorMessage) -> { + log.warn("Error processing unposted offer {}: {}", openOffer.getId(), errorMessage); + onCancelled(openOffer); + offer.setErrorMessage(errorMessage); + latch.countDown(); + errorMessageHandler.handleErrorMessage(errorMessage); + }); + HavenoUtils.awaitLatch(latch); + } + }).start(); } // Remove from offerbook @@ -772,94 +782,94 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // Place offer helpers /////////////////////////////////////////////////////////////////////////////////////////// - private void processUnpostedOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler + private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler ErrorMessageHandler errorMessageHandler) { new Thread(() -> { - List errorMessages = new ArrayList(); - List openOffers = getOpenOffers(); - for (OpenOffer scheduledOffer : openOffers) { - if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; - CountDownLatch latch = new CountDownLatch(1); - processUnpostedOffer(openOffers, scheduledOffer, (transaction) -> { - latch.countDown(); - }, errorMessage -> { - log.warn("Error processing unposted offer {}: {}", scheduledOffer.getId(), errorMessage); - onCancelled(scheduledOffer); - errorMessages.add(errorMessage); - latch.countDown(); - }); - HavenoUtils.awaitLatch(latch); + synchronized (processOffersLock) { + List errorMessages = new ArrayList(); + List openOffers = getOpenOffers(); + for (OpenOffer scheduledOffer : openOffers) { + if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; + CountDownLatch latch = new CountDownLatch(1); + processUnpostedOffer(openOffers, scheduledOffer, (transaction) -> { + latch.countDown(); + }, errorMessage -> { + log.warn("Error processing unposted offer {}: {}", scheduledOffer.getId(), errorMessage); + onCancelled(scheduledOffer); + errorMessages.add(errorMessage); + latch.countDown(); + }); + HavenoUtils.awaitLatch(latch); + } + requestPersistence(); + if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); + else resultHandler.handleResult(null); } - requestPersistence(); - if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); - else resultHandler.handleResult(null); }).start(); } private void processUnpostedOffer(List openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { new Thread(() -> { - synchronized (xmrWalletService) { - try { + try { - // done processing if wallet not initialized - if (xmrWalletService.getWallet() == null) { - resultHandler.handleResult(null); + // done processing if wallet not initialized + if (xmrWalletService.getWallet() == null) { + resultHandler.handleResult(null); + return; + } + + // get offer reserve amount + BigInteger offerReserveAmount = openOffer.getOffer().getReserveAmount(); + + // handle split output offer + if (openOffer.isReserveExactAmount()) { + + // find tx with exact input amount + MoneroTxWallet splitOutputTx = findSplitOutputFundingTx(openOffers, openOffer); + if (splitOutputTx != null && openOffer.getScheduledTxHashes() == null) { + openOffer.setScheduledTxHashes(Arrays.asList(splitOutputTx.getHash())); + openOffer.setSplitOutputTxHash(splitOutputTx.getHash()); + openOffer.setScheduledAmount(offerReserveAmount.toString()); + openOffer.setState(OpenOffer.State.SCHEDULED); + } + + // if not found, create tx to split exact output + if (splitOutputTx == null) { + splitOrSchedule(openOffers, openOffer, offerReserveAmount); + } else if (!splitOutputTx.isLocked()) { + + // otherwise sign and post offer if split output available + signAndPostOffer(openOffer, true, resultHandler, (errMsg) -> { + + // on error, create new tx to split output if offer subaddress does not have exact output + int offerSubaddress = xmrWalletService.getOrCreateAddressEntry(openOffer.getId(), XmrAddressEntry.Context.OFFER_FUNDING).getSubaddressIndex(); + if (!splitOutputTx.getOutgoingTransfer().getSubaddressIndices().equals(Arrays.asList(offerSubaddress))) { + log.warn("Splitting new output because spending existing output(s) failed for offer {}. Split output tx subaddresses={}. Offer funding subadress={}", openOffer.getId(), splitOutputTx.getOutgoingTransfer().getSubaddressIndices(), offerSubaddress); + splitOrSchedule(openOffers, openOffer, offerReserveAmount); + resultHandler.handleResult(null); + } else { + errorMessageHandler.handleErrorMessage(errMsg); + } + }); return; } - - // get offer reserve amount - BigInteger offerReserveAmount = openOffer.getOffer().getReserveAmount(); - - // handle split output offer - if (openOffer.isReserveExactAmount()) { + } else { - // find tx with exact input amount - MoneroTxWallet splitOutputTx = findSplitOutputFundingTx(openOffers, openOffer); - if (splitOutputTx != null && openOffer.getScheduledTxHashes() == null) { - openOffer.setScheduledTxHashes(Arrays.asList(splitOutputTx.getHash())); - openOffer.setSplitOutputTxHash(splitOutputTx.getHash()); - openOffer.setScheduledAmount(offerReserveAmount.toString()); - openOffer.setState(OpenOffer.State.SCHEDULED); - } - - // if not found, create tx to split exact output - if (splitOutputTx == null) { - splitOrSchedule(openOffers, openOffer, offerReserveAmount); - } else if (!splitOutputTx.isLocked()) { - - // otherwise sign and post offer if split output available - signAndPostOffer(openOffer, true, resultHandler, (errMsg) -> { - - // on error, create new tx to split output if offer subaddress does not have exact output - int offerSubaddress = xmrWalletService.getOrCreateAddressEntry(openOffer.getId(), XmrAddressEntry.Context.OFFER_FUNDING).getSubaddressIndex(); - if (!splitOutputTx.getOutgoingTransfer().getSubaddressIndices().equals(Arrays.asList(offerSubaddress))) { - log.warn("Splitting new output because spending existing output(s) failed for offer {}. Split output tx subaddresses={}. Offer funding subadress={}", openOffer.getId(), splitOutputTx.getOutgoingTransfer().getSubaddressIndices(), offerSubaddress); - splitOrSchedule(openOffers, openOffer, offerReserveAmount); - resultHandler.handleResult(null); - } else { - errorMessageHandler.handleErrorMessage(errMsg); - } - }); - return; - } - } else { - - // handle sufficient balance - boolean hasSufficientBalance = xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0; - if (hasSufficientBalance) { - signAndPostOffer(openOffer, true, resultHandler, errorMessageHandler); - return; - } else if (openOffer.getScheduledTxHashes() == null) { - scheduleWithEarliestTxs(openOffers, openOffer); - } + // handle sufficient balance + boolean hasSufficientBalance = xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0; + if (hasSufficientBalance) { + signAndPostOffer(openOffer, true, resultHandler, errorMessageHandler); + return; + } else if (openOffer.getScheduledTxHashes() == null) { + scheduleWithEarliestTxs(openOffers, openOffer); } - - // handle result - resultHandler.handleResult(null); - } catch (Exception e) { - e.printStackTrace(); - errorMessageHandler.handleErrorMessage(e.getMessage()); } + + // handle result + resultHandler.handleResult(null); + } catch (Exception e) { + e.printStackTrace(); + errorMessageHandler.handleErrorMessage(e.getMessage()); } }).start(); } @@ -1090,7 +1100,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe synchronized (placeOfferProtocols) { placeOfferProtocols.put(openOffer.getOffer().getId(), placeOfferProtocol); } - placeOfferProtocol.placeOffer(); // TODO (woodser): if error placing offer (e.g. bad signature), remove protocol and unfreeze trade funds + placeOfferProtocol.placeOffer(); } ///////////////////////////////////////////////////////////////////////////////////////////