From 0d60df2aa7013de4619ac0ae2997c7f0360e189a Mon Sep 17 00:00:00 2001 From: woodser Date: Fri, 5 Jan 2024 13:25:29 -0500 Subject: [PATCH] refactor trade and connection threading to new ThreadUtils --- .../main/java/haveno/common/ThreadUtils.java | 144 +++++ .../haveno/core/api/XmrConnectionService.java | 3 +- .../haveno/core/app/HavenoExecutable.java | 5 +- .../app/misc/ExecutableForAppWithP2p.java | 5 +- .../haveno/core/offer/OpenOfferManager.java | 18 +- .../core/support/dispute/DisputeManager.java | 185 +++---- .../arbitration/ArbitrationManager.java | 249 ++++----- .../java/haveno/core/trade/HavenoUtils.java | 93 ---- .../main/java/haveno/core/trade/Trade.java | 98 ++-- .../java/haveno/core/trade/TradeManager.java | 14 +- .../trade/protocol/BuyerAsMakerProtocol.java | 5 +- .../trade/protocol/BuyerAsTakerProtocol.java | 5 +- .../trade/protocol/SellerAsMakerProtocol.java | 5 +- .../trade/protocol/SellerAsTakerProtocol.java | 5 +- .../core/trade/protocol/TradeProtocol.java | 491 +++++++++--------- .../core/xmr/wallet/XmrWalletService.java | 35 +- .../network/p2p/network/Connection.java | 31 +- 17 files changed, 739 insertions(+), 652 deletions(-) create mode 100644 common/src/main/java/haveno/common/ThreadUtils.java diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java new file mode 100644 index 00000000..c4cb5c4b --- /dev/null +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -0,0 +1,144 @@ +/* + * This file is part of Haveno. + * + * Haveno is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Haveno is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Haveno. If not, see . + */ + +package haveno.common; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ThreadUtils { + + private static final Map EXECUTORS = new HashMap<>(); + private static final Map THREAD_BY_ID = new HashMap<>(); + private static final int POOL_SIZE = 10; + private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); + + + public static void execute(Runnable command, String threadId) { + synchronized (EXECUTORS) { + if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); + EXECUTORS.get(threadId).execute(() -> { + synchronized (THREAD_BY_ID) { + THREAD_BY_ID.put(threadId, Thread.currentThread()); + } + command.run(); + }); + } + } + + public static void await(Runnable command, String threadId) { + if (isCurrentThread(Thread.currentThread(), threadId)) { + command.run(); + } else { + CountDownLatch latch = new CountDownLatch(1); + execute(command, threadId); // run task + execute(() -> latch.countDown(), threadId); // await next tick + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public static void shutDown(String threadId, long timeoutMs) { + ExecutorService pool = null; + synchronized (EXECUTORS) { + if (!EXECUTORS.containsKey(threadId)) return; // thread not found + pool = EXECUTORS.get(threadId); + } + pool.shutdown(); + try { + if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); + } catch (InterruptedException e) { + pool.shutdownNow(); + throw new RuntimeException(e); + } finally { + synchronized (EXECUTORS) { + EXECUTORS.remove(threadId); + } + synchronized (THREAD_BY_ID) { + THREAD_BY_ID.remove(threadId); + } + } + } + + public static Future submitToPool(Runnable task) { + return submitToPool(Arrays.asList(task)).get(0); + } + + public static List> submitToPool(List tasks) { + List> futures = new ArrayList<>(); + for (Runnable task : tasks) futures.add(POOL.submit(task)); + return futures; + } + + // TODO: these are unused; remove? use monero-java awaitTasks() when updated + + public static Future awaitTask(Runnable task) { + return awaitTasks(Arrays.asList(task)).get(0); + } + + public static List> awaitTasks(Collection tasks) { + return awaitTasks(tasks, tasks.size()); + } + + public static List> awaitTasks(Collection tasks, int maxConcurrency) { + return awaitTasks(tasks, maxConcurrency, null); + } + + public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) { + List> futures = new ArrayList<>(); + if (tasks.isEmpty()) return futures; + ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency); + for (Runnable task : tasks) futures.add(pool.submit(task)); + pool.shutdown(); + + // interrupt after timeout + if (timeoutSeconds != null) { + try { + if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow(); + } catch (InterruptedException e) { + pool.shutdownNow(); + throw new RuntimeException(e); + } + } + + // throw exception from any tasks + try { + for (Future future : futures) future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return futures; + } + + private static boolean isCurrentThread(Thread thread, String threadId) { + synchronized (THREAD_BY_ID) { + if (!THREAD_BY_ID.containsKey(threadId)) return false; + return thread == THREAD_BY_ID.get(threadId); + } + } +} diff --git a/core/src/main/java/haveno/core/api/XmrConnectionService.java b/core/src/main/java/haveno/core/api/XmrConnectionService.java index 66b8b846..65a4eaf1 100644 --- a/core/src/main/java/haveno/core/api/XmrConnectionService.java +++ b/core/src/main/java/haveno/core/api/XmrConnectionService.java @@ -1,5 +1,6 @@ package haveno.core.api; +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.app.DevEnv; import haveno.common.config.BaseCurrencyNetwork; @@ -562,7 +563,7 @@ public final class XmrConnectionService { // notify listeners in parallel synchronized (listenerLock) { for (MoneroConnectionManagerListener listener : listeners) { - HavenoUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection)); + ThreadUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection)); } } } diff --git a/core/src/main/java/haveno/core/app/HavenoExecutable.java b/core/src/main/java/haveno/core/app/HavenoExecutable.java index 1a7434bc..cbbc9b31 100644 --- a/core/src/main/java/haveno/core/app/HavenoExecutable.java +++ b/core/src/main/java/haveno/core/app/HavenoExecutable.java @@ -19,6 +19,8 @@ package haveno.core.app; import com.google.inject.Guice; import com.google.inject.Injector; + +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.app.AppModule; import haveno.common.config.Config; @@ -41,7 +43,6 @@ import haveno.core.provider.price.PriceFeedService; import haveno.core.setup.CorePersistedDataHost; import haveno.core.setup.CoreSetup; import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager; -import haveno.core.trade.HavenoUtils; import haveno.core.trade.TradeManager; import haveno.core.trade.statistics.TradeStatisticsManager; import haveno.core.xmr.setup.WalletsSetup; @@ -340,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); try { - HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout } catch (Exception e) { e.printStackTrace(); } diff --git a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java index 0fe88114..ea44c0b3 100644 --- a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java +++ b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java @@ -18,6 +18,8 @@ package haveno.core.app.misc; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.app.DevEnv; import haveno.common.config.Config; @@ -33,7 +35,6 @@ import haveno.core.offer.OfferBookService; import haveno.core.offer.OpenOfferManager; import haveno.core.provider.price.PriceFeedService; import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager; -import haveno.core.trade.HavenoUtils; import haveno.core.trade.TradeManager; import haveno.core.trade.statistics.TradeStatisticsManager; import haveno.core.xmr.setup.WalletsSetup; @@ -103,7 +104,7 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable { tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); try { - HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout } catch (Exception e) { e.printStackTrace(); } diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index 27bf6fd9..1de7fbcd 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -18,6 +18,7 @@ package haveno.core.offer; import common.utils.GenUtils; +import haveno.common.ThreadUtils; import haveno.common.Timer; import haveno.common.UserThread; import haveno.common.app.Capabilities; @@ -150,6 +151,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // poll key images of signed offers private XmrKeyImagePoller signedOfferKeyImagePoller; + private static final long SHUTDOWN_TIMEOUT_MS = 90000; private static final long KEY_IMAGE_REFRESH_PERIOD_MS_LOCAL = 20000; // 20 seconds private static final long KEY_IMAGE_REFRESH_PERIOD_MS_REMOTE = 300000; // 5 minutes @@ -301,7 +303,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } public void shutDown(@Nullable Runnable completeHandler) { - HavenoUtils.removeThreadId(THREAD_ID); stopped = true; p2PService.getPeerManager().removeListener(this); p2PService.removeDecryptedDirectMessageListener(this); @@ -316,7 +317,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe int size = openOffers.size(); log.info("Remove open offers at shutDown. Number of open offers: {}", size); if (offerBookService.isBootstrapped() && size > 0) { - HavenoUtils.submitToThread(() -> { // finish tasks + ThreadUtils.execute(() -> { // finish tasks UserThread.execute(() -> { openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())); @@ -337,6 +338,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe if (completeHandler != null) completeHandler.run(); } + + // shut down pool + ThreadUtils.shutDown(THREAD_ID, SHUTDOWN_TIMEOUT_MS); } public void removeAllOpenOffers(@Nullable Runnable completeHandler) { @@ -400,7 +404,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe maybeUpdatePersistedOffers(); - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { // Wait for prices to be available priceFeedService.awaitExternalPrices(); @@ -506,7 +510,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount); // schedule or post offer - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { synchronized (processOffersLock) { CountDownLatch latch = new CountDownLatch(1); processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> { @@ -807,7 +811,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler ErrorMessageHandler errorMessageHandler) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { synchronized (processOffersLock) { List errorMessages = new ArrayList(); List openOffers = getOpenOffers(); @@ -1571,7 +1575,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopPeriodicRefreshOffersTimer(); - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { processListForRepublishOffers(getOpenOffers()); }, THREAD_ID); } @@ -1607,7 +1611,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { // determine if offer is valid boolean isValid = true; diff --git a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java index 256ad8c4..9d81758b 100644 --- a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java +++ b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java @@ -17,6 +17,7 @@ package haveno.core.support.dispute; +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.app.Version; import haveno.common.config.Config; @@ -447,106 +448,108 @@ public abstract class DisputeManager> extends Sup return; } - synchronized (trade) { - String errorMessage = null; - PubKeyRing senderPubKeyRing = null; - try { - - // initialize - T disputeList = getDisputeList(); - if (disputeList == null) { - log.warn("disputes is null"); - return; - } - dispute.setSupportType(message.getSupportType()); - dispute.setState(Dispute.State.NEW); - Contract contract = dispute.getContract(); - - // validate dispute + ThreadUtils.execute(() -> { + synchronized (trade) { + String errorMessage = null; + PubKeyRing senderPubKeyRing = null; try { - DisputeValidation.validateDisputeData(dispute); - DisputeValidation.validateNodeAddresses(dispute, config); - DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress()); - //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList()); - } catch (DisputeValidation.ValidationException e) { - e.printStackTrace(); - validationExceptions.add(e); - throw e; - } - // try to validate payment account - // TODO: add field to dispute details: valid, invalid, missing - try { - DisputeValidation.validatePaymentAccountPayload(dispute); + // initialize + T disputeList = getDisputeList(); + if (disputeList == null) { + log.warn("disputes is null"); + return; + } + dispute.setSupportType(message.getSupportType()); + dispute.setState(Dispute.State.NEW); + Contract contract = dispute.getContract(); + + // validate dispute + try { + DisputeValidation.validateDisputeData(dispute); + DisputeValidation.validateNodeAddresses(dispute, config); + DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress()); + //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList()); + } catch (DisputeValidation.ValidationException e) { + e.printStackTrace(); + validationExceptions.add(e); + throw e; + } + + // try to validate payment account + // TODO: add field to dispute details: valid, invalid, missing + try { + DisputeValidation.validatePaymentAccountPayload(dispute); + } catch (Exception e) { + e.printStackTrace(); + log.warn(e.getMessage()); + trade.prependErrorMessage(e.getMessage()); + } + + // get sender + senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing(); + TradePeer sender = trade.getTradePeer(senderPubKeyRing); + if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller"); + + // message to trader is expected from arbitrator + if (!trade.isArbitrator() && sender != trade.getArbitrator()) { + throw new RuntimeException(message.getClass().getSimpleName() + " to trader is expected only from arbitrator"); + } + + // arbitrator verifies signature of payment sent message if given + if (trade.isArbitrator() && message.getPaymentSentMessage() != null) { + HavenoUtils.verifyPaymentSentMessage(trade, message.getPaymentSentMessage()); + trade.getBuyer().setUpdatedMultisigHex(message.getPaymentSentMessage().getUpdatedMultisigHex()); + trade.advanceState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + } + + // update multisig hex + if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex()); + if (trade.walletExists()) trade.importMultisigHex(); + + // add chat message with price info + if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0); + + // add dispute + synchronized (disputeList) { + if (!disputeList.contains(dispute)) { + Optional storedDisputeOptional = findDispute(dispute); + if (!storedDisputeOptional.isPresent()) { + disputeList.add(dispute); + trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); + + // send dispute opened message to peer if arbitrator + if (trade.isArbitrator()) sendDisputeOpenedMessageToPeer(dispute, contract, dispute.isDisputeOpenerIsBuyer() ? contract.getSellerPubKeyRing() : contract.getBuyerPubKeyRing(), trade.getSelf().getUpdatedMultisigHex()); + tradeManager.requestPersistence(); + errorMessage = null; + } else { + // valid case if both have opened a dispute and agent was not online + log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId()); + } + + // add chat message with mediation info if applicable + addMediationResultMessage(dispute); + } else { + throw new RuntimeException("We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId()); + } + } } catch (Exception e) { e.printStackTrace(); - log.warn(e.getMessage()); - trade.prependErrorMessage(e.getMessage()); + errorMessage = e.getMessage(); + log.warn(errorMessage); + if (trade != null) trade.setErrorMessage(errorMessage); } - // get sender - senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing(); - TradePeer sender = trade.getTradePeer(senderPubKeyRing); - if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller"); - - // message to trader is expected from arbitrator - if (!trade.isArbitrator() && sender != trade.getArbitrator()) { - throw new RuntimeException(message.getClass().getSimpleName() + " to trader is expected only from arbitrator"); + // use chat message instead of open dispute message for the ack + ObservableList messages = message.getDispute().getChatMessages(); + if (!messages.isEmpty()) { + ChatMessage msg = messages.get(0); + sendAckMessage(msg, senderPubKeyRing, errorMessage == null, errorMessage); } - // arbitrator verifies signature of payment sent message if given - if (trade.isArbitrator() && message.getPaymentSentMessage() != null) { - HavenoUtils.verifyPaymentSentMessage(trade, message.getPaymentSentMessage()); - trade.getBuyer().setUpdatedMultisigHex(message.getPaymentSentMessage().getUpdatedMultisigHex()); - trade.advanceState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); - } - - // update multisig hex - if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex()); - if (trade.walletExists()) trade.importMultisigHex(); - - // add chat message with price info - if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0); - - // add dispute - synchronized (disputeList) { - if (!disputeList.contains(dispute)) { - Optional storedDisputeOptional = findDispute(dispute); - if (!storedDisputeOptional.isPresent()) { - disputeList.add(dispute); - trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); - - // send dispute opened message to peer if arbitrator - if (trade.isArbitrator()) sendDisputeOpenedMessageToPeer(dispute, contract, dispute.isDisputeOpenerIsBuyer() ? contract.getSellerPubKeyRing() : contract.getBuyerPubKeyRing(), trade.getSelf().getUpdatedMultisigHex()); - tradeManager.requestPersistence(); - errorMessage = null; - } else { - // valid case if both have opened a dispute and agent was not online - log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId()); - } - - // add chat message with mediation info if applicable - addMediationResultMessage(dispute); - } else { - throw new RuntimeException("We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId()); - } - } - } catch (Exception e) { - e.printStackTrace(); - errorMessage = e.getMessage(); - log.warn(errorMessage); - if (trade != null) trade.setErrorMessage(errorMessage); + requestPersistence(); } - - // use chat message instead of open dispute message for the ack - ObservableList messages = message.getDispute().getChatMessages(); - if (!messages.isEmpty()) { - ChatMessage msg = messages.get(0); - sendAckMessage(msg, senderPubKeyRing, errorMessage == null, errorMessage); - } - - requestPersistence(); - } + }, trade.getId()); } // arbitrator sends dispute opened message to opener's peer diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index 4802d7c8..e0b25185 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -20,6 +20,7 @@ package haveno.core.support.dispute.arbitration; import com.google.inject.Inject; import com.google.inject.Singleton; import common.utils.GenUtils; +import haveno.common.ThreadUtils; import haveno.common.Timer; import haveno.common.UserThread; import haveno.common.app.Version; @@ -116,7 +117,7 @@ public final class ArbitrationManager extends DisputeManager { + ThreadUtils.execute(() -> { if (message instanceof DisputeOpenedMessage) { handleDisputeOpenedMessage((DisputeOpenedMessage) message); } else if (message instanceof ChatMessage) { @@ -187,144 +188,146 @@ public final class ArbitrationManager extends DisputeManager { + ChatMessage chatMessage = null; + Dispute dispute = null; + synchronized (trade) { + try { + DisputeResult disputeResult = disputeClosedMessage.getDisputeResult(); + chatMessage = disputeResult.getChatMessage(); + checkNotNull(chatMessage, "chatMessage must not be null"); + String tradeId = disputeResult.getTradeId(); - log.info("Processing {} for {} {}", disputeClosedMessage.getClass().getSimpleName(), trade.getClass().getSimpleName(), disputeResult.getTradeId()); + log.info("Processing {} for {} {}", disputeClosedMessage.getClass().getSimpleName(), trade.getClass().getSimpleName(), disputeResult.getTradeId()); - // get dispute - Optional disputeOptional = findDispute(disputeResult); - String uid = disputeClosedMessage.getUid(); - if (!disputeOptional.isPresent()) { - log.warn("We got a dispute closed msg but we don't have a matching dispute. " + - "That might happen when we get the DisputeClosedMessage before the dispute was created. " + - "We try again after 2 sec. to apply the DisputeClosedMessage. TradeId = " + tradeId); - if (!delayMsgMap.containsKey(uid)) { - // We delay 2 sec. to be sure the comm. msg gets added first - Timer timer = UserThread.runAfter(() -> handleDisputeClosedMessage(disputeClosedMessage), 2); - delayMsgMap.put(uid, timer); - } else { - log.warn("We got a dispute closed msg after we already repeated to apply the message after a delay. " + - "That should never happen. TradeId = " + tradeId); - } - return; - } - dispute = disputeOptional.get(); - - // verify arbitrator signature - String summaryText = chatMessage.getMessage(); - if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId())); - if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring - else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered) - - // save dispute closed message for reprocessing - trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage); - requestPersistence(); - - // verify arbitrator does not receive DisputeClosedMessage - if (keyRing.getPubKeyRing().equals(dispute.getAgentPubKeyRing())) { - log.error("Arbitrator received disputeResultMessage. That should never happen."); - trade.getArbitrator().setDisputeClosedMessage(null); // don't reprocess - return; - } - - // set dispute state - cleanupRetryMap(uid); - if (!dispute.getChatMessages().contains(chatMessage)) { - dispute.addAndPersistChatMessage(chatMessage); - } else { - log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId()); - } - dispute.setIsClosed(); - if (dispute.disputeResultProperty().get() != null) { - log.info("We already got a dispute result, indicating the message was resent after updating multisig info. TradeId = " + tradeId); - } - dispute.setDisputeResult(disputeResult); - - // sync and save wallet - if (!trade.isPayoutPublished()) { - trade.syncAndPollWallet(); - trade.saveWallet(); - } - - // update multisig hex - if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex()); - if (trade.walletExists()) trade.importMultisigHex(); - - // attempt to sign and publish dispute payout tx if given and not already published - if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) { - - // wait to sign and publish payout tx if defer flag set - if (disputeClosedMessage.isDeferPublishPayout()) { - log.info("Deferring signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId()); - for (int i = 0; i < 5; i++) { - if (trade.isPayoutPublished()) break; - GenUtils.waitFor(Trade.DEFER_PUBLISH_MS / 5); + // get dispute + Optional disputeOptional = findDispute(disputeResult); + String uid = disputeClosedMessage.getUid(); + if (!disputeOptional.isPresent()) { + log.warn("We got a dispute closed msg but we don't have a matching dispute. " + + "That might happen when we get the DisputeClosedMessage before the dispute was created. " + + "We try again after 2 sec. to apply the DisputeClosedMessage. TradeId = " + tradeId); + if (!delayMsgMap.containsKey(uid)) { + // We delay 2 sec. to be sure the comm. msg gets added first + Timer timer = UserThread.runAfter(() -> handleDisputeClosedMessage(disputeClosedMessage), 2); + delayMsgMap.put(uid, timer); + } else { + log.warn("We got a dispute closed msg after we already repeated to apply the message after a delay. " + + "That should never happen. TradeId = " + tradeId); } - if (!trade.isPayoutPublished()) trade.syncAndPollWallet(); + return; + } + dispute = disputeOptional.get(); + + // verify arbitrator signature + String summaryText = chatMessage.getMessage(); + if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId())); + if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring + else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered) + + // save dispute closed message for reprocessing + trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage); + requestPersistence(); + + // verify arbitrator does not receive DisputeClosedMessage + if (keyRing.getPubKeyRing().equals(dispute.getAgentPubKeyRing())) { + log.error("Arbitrator received disputeResultMessage. That should never happen."); + trade.getArbitrator().setDisputeClosedMessage(null); // don't reprocess + return; } - // sign and publish dispute payout tx if peer still has not published - if (trade.isPayoutPublished()) { - log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); + // set dispute state + cleanupRetryMap(uid); + if (!dispute.getChatMessages().contains(chatMessage)) { + dispute.addAndPersistChatMessage(chatMessage); } else { - try { - log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId()); - signAndPublishDisputePayoutTx(trade); - } catch (Exception e) { + log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId()); + } + dispute.setIsClosed(); + if (dispute.disputeResultProperty().get() != null) { + log.info("We already got a dispute result, indicating the message was resent after updating multisig info. TradeId = " + tradeId); + } + dispute.setDisputeResult(disputeResult); - // check if payout published again - trade.syncAndPollWallet(); - if (trade.isPayoutPublished()) { - log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); - } else { - throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId); + // sync and save wallet + if (!trade.isPayoutPublished()) { + trade.syncAndPollWallet(); + trade.saveWallet(); + } + + // update multisig hex + if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex()); + if (trade.walletExists()) trade.importMultisigHex(); + + // attempt to sign and publish dispute payout tx if given and not already published + if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) { + + // wait to sign and publish payout tx if defer flag set + if (disputeClosedMessage.isDeferPublishPayout()) { + log.info("Deferring signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId()); + for (int i = 0; i < 5; i++) { + if (trade.isPayoutPublished()) break; + GenUtils.waitFor(Trade.DEFER_PUBLISH_MS / 5); + } + if (!trade.isPayoutPublished()) trade.syncAndPollWallet(); + } + + // sign and publish dispute payout tx if peer still has not published + if (trade.isPayoutPublished()) { + log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); + } else { + try { + log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId()); + signAndPublishDisputePayoutTx(trade); + } catch (Exception e) { + + // check if payout published again + trade.syncAndPollWallet(); + if (trade.isPayoutPublished()) { + log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); + } else { + throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId); + } } } + } else { + if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); + else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId()); } - } else { - if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); - else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId()); - } - // complete disputed trade - if (trade.isPayoutPublished()) { - tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED); - } + // complete disputed trade + if (trade.isPayoutPublished()) { + tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED); + } - // We use the chatMessage as we only persist those not the DisputeClosedMessage. - // If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage. - sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null); - requestPersistence(); - } catch (Exception e) { - log.warn("Error processing dispute closed message: " + e.getMessage()); - e.printStackTrace(); - requestPersistence(); - - // nack bad message and do not reprocess - if (e instanceof IllegalArgumentException) { - trade.getArbitrator().setDisputeClosedMessage(null); // message is processed - sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), false, e.getMessage()); + // We use the chatMessage as we only persist those not the DisputeClosedMessage. + // If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage. + sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null); + requestPersistence(); + } catch (Exception e) { + log.warn("Error processing dispute closed message: " + e.getMessage()); + e.printStackTrace(); requestPersistence(); - throw e; - } - // schedule to reprocess message unless deleted - if (trade.getArbitrator().getDisputeClosedMessage() != null) { - if (!reprocessDisputeClosedMessageCounts.containsKey(trade.getId())) reprocessDisputeClosedMessageCounts.put(trade.getId(), 0); - UserThread.runAfter(() -> { - reprocessDisputeClosedMessageCounts.put(trade.getId(), reprocessDisputeClosedMessageCounts.get(trade.getId()) + 1); // increment reprocess count - maybeReprocessDisputeClosedMessage(trade, reprocessOnError); - }, trade.getReprocessDelayInSeconds(reprocessDisputeClosedMessageCounts.get(trade.getId()))); + // nack bad message and do not reprocess + if (e instanceof IllegalArgumentException) { + trade.getArbitrator().setDisputeClosedMessage(null); // message is processed + sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), false, e.getMessage()); + requestPersistence(); + throw e; + } + + // schedule to reprocess message unless deleted + if (trade.getArbitrator().getDisputeClosedMessage() != null) { + if (!reprocessDisputeClosedMessageCounts.containsKey(trade.getId())) reprocessDisputeClosedMessageCounts.put(trade.getId(), 0); + UserThread.runAfter(() -> { + reprocessDisputeClosedMessageCounts.put(trade.getId(), reprocessDisputeClosedMessageCounts.get(trade.getId()) + 1); // increment reprocess count + maybeReprocessDisputeClosedMessage(trade, reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessDisputeClosedMessageCounts.get(trade.getId()))); + } } } - } + }, trade.getId()); } public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) { diff --git a/core/src/main/java/haveno/core/trade/HavenoUtils.java b/core/src/main/java/haveno/core/trade/HavenoUtils.java index 360c3730..46134a99 100644 --- a/core/src/main/java/haveno/core/trade/HavenoUtils.java +++ b/core/src/main/java/haveno/core/trade/HavenoUtils.java @@ -44,18 +44,8 @@ import java.security.PrivateKey; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import monero.common.MoneroRpcConnection; @@ -79,9 +69,6 @@ public class HavenoUtils { private static final BigInteger XMR_AU_MULTIPLIER = new BigInteger("1000000000000"); public static final DecimalFormat XMR_FORMATTER = new DecimalFormat("##############0.000000000000", DECIMAL_FORMAT_SYMBOLS); public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); - private static final int POOL_SIZE = 10; - private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); - private static final Map POOLS = new HashMap<>(); // TODO: better way to share references? public static ArbitrationManager arbitrationManager; @@ -474,86 +461,6 @@ public class HavenoUtils { } } - public static Future submitToPool(Runnable task) { - return submitToPool(Arrays.asList(task)).get(0); - } - - public static List> submitToPool(List tasks) { - List> futures = new ArrayList<>(); - for (Runnable task : tasks) futures.add(POOL.submit(task)); - return futures; - } - - public static Future submitToSharedThread(Runnable task) { - return submitToThread(task, HavenoUtils.class.getSimpleName()); - } - - public static Future submitToThread(Runnable task, String threadId) { - synchronized (POOLS) { - if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1)); - return POOLS.get(threadId).submit(task); - } - } - - public static Future awaitThread(Runnable task, String threadId) { - Future future = submitToThread(task, threadId); - try { - future.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - return future; - } - - public static void removeThreadId(String threadId) { - synchronized (POOLS) { - if (POOLS.containsKey(threadId)) { - POOLS.get(threadId).shutdown(); - POOLS.remove(threadId); - } - } - } - - // TODO: these are unused; remove? use monero-java awaitTasks() when updated - - public static Future awaitTask(Runnable task) { - return awaitTasks(Arrays.asList(task)).get(0); - } - - public static List> awaitTasks(Collection tasks) { - return awaitTasks(tasks, tasks.size()); - } - - public static List> awaitTasks(Collection tasks, int maxConcurrency) { - return awaitTasks(tasks, maxConcurrency, null); - } - - public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) { - List> futures = new ArrayList<>(); - if (tasks.isEmpty()) return futures; - ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency); - for (Runnable task : tasks) futures.add(pool.submit(task)); - pool.shutdown(); - - // interrupt after timeout - if (timeoutSeconds != null) { - try { - if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow(); - } catch (InterruptedException e) { - pool.shutdownNow(); - throw new RuntimeException(e); - } - } - - // throw exception from any tasks - try { - for (Future future : futures) future.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - return futures; - } - public static String toCamelCase(String underscore) { return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, underscore); } diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 34991345..5896b5d7 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import common.utils.GenUtils; +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.config.Config; import haveno.common.crypto.Encryption; @@ -116,6 +117,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class Trade implements Tradable, Model { private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_"; + private static final long SHUTDOWN_TIMEOUT_MS = 90000; private final Object walletLock = new Object(); private final Object pollLock = new Object(); private MoneroWallet wallet; @@ -586,7 +588,7 @@ public abstract class Trade implements Tradable, Model { /////////////////////////////////////////////////////////////////////////////////////////// public void initialize(ProcessModelServiceProvider serviceProvider) { - synchronized (this) { + ThreadUtils.await(() -> { if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); // set arbitrator pub key ring once known @@ -596,8 +598,8 @@ public abstract class Trade implements Tradable, Model { // handle connection change on dedicated thread xmrConnectionService.addConnectionListener(connection -> { - HavenoUtils.submitToPool(() -> { - HavenoUtils.submitToThread(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); + ThreadUtils.submitToPool(() -> { // TODO: remove this? + ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); }); }); @@ -621,7 +623,7 @@ public abstract class Trade implements Tradable, Model { // handle trade state events tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { if (newValue == Trade.State.MULTISIG_COMPLETED) { updateWalletRefreshPeriod(); startPolling(); @@ -631,7 +633,7 @@ public abstract class Trade implements Tradable, Model { // handle trade phase events tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); if (isPaymentReceived()) { UserThread.execute(() -> { @@ -646,7 +648,7 @@ public abstract class Trade implements Tradable, Model { // handle payout events payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { if (isPayoutPublished()) updateWalletRefreshPeriod(); // handle when payout published @@ -677,7 +679,7 @@ public abstract class Trade implements Tradable, Model { if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { if (!isInitialized) return; log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { deleteWallet(); maybeClearProcessData(); if (idlePayoutSyncer != null) { @@ -722,7 +724,7 @@ public abstract class Trade implements Tradable, Model { // initialize syncing and polling initSyncing(); - } + }, getId()); } public void requestPersistence() { @@ -787,8 +789,8 @@ public abstract class Trade implements Tradable, Model { // check wallet connection on same thread as connection change CountDownLatch latch = new CountDownLatch(1); - HavenoUtils.submitToPool((() -> { - HavenoUtils.submitToThread(() -> { + ThreadUtils.submitToPool((() -> { + ThreadUtils.execute(() -> { if (!isWalletConnectedToDaemon()) throw new RuntimeException("Trade wallet is not connected to a Monero node"); // wallet connection is updated on trade thread latch.countDown(); }, getConnectionChangedThreadId()); @@ -1222,36 +1224,47 @@ public abstract class Trade implements Tradable, Model { } public void onShutDownStarted() { + if (wallet != null) log.info("Preparing to shut down {} {}", getClass().getSimpleName(), getId()); isShutDownStarted = true; - if (wallet != null) log.info("{} {} preparing for shut down", getClass().getSimpleName(), getId()); stopPolling(); - - // repeatedly acquire trade lock to allow other threads to finish - for (int i = 0; i < 20; i++) { - synchronized (this) { - synchronized (walletLock) { - if (isShutDown) break; - } - } - } } public void shutDown() { - if (!isPayoutUnlocked()) log.info("{} {} shutting down", getClass().getSimpleName(), getId()); - synchronized (this) { - isInitialized = false; - isShutDown = true; - synchronized (walletLock) { - if (wallet != null) { - xmrWalletService.saveWallet(wallet, false); // skip backup - stopWallet(); - } + if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); + + // shut down thread pools with timeout + List tasks = new ArrayList<>(); + tasks.add(() -> ThreadUtils.shutDown(getId(), SHUTDOWN_TIMEOUT_MS)); + tasks.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId(), SHUTDOWN_TIMEOUT_MS)); + try { + ThreadUtils.awaitTasks(tasks); + } catch (Exception e) { + log.warn("Timeout shutting down {} {}", getClass().getSimpleName(), getId()); + + // force stop wallet + if (wallet != null) { + log.warn("Force stopping wallet for {} {}", getClass().getSimpleName(), getId()); + xmrWalletService.stopWallet(wallet, wallet.getPath(), true); + wallet = null; } + } + + // de-initialize + isInitialized = false; + isShutDown = true; + if (idlePayoutSyncer != null) { + xmrWalletService.removeWalletListener(idlePayoutSyncer); + idlePayoutSyncer = null; + } + if (wallet != null) { + xmrWalletService.saveWallet(wallet, false); // skip backup + stopWallet(); + } + UserThread.execute(() -> { if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe(); if (tradePhaseSubscription != null) tradePhaseSubscription.unsubscribe(); if (payoutStateSubscription != null) payoutStateSubscription.unsubscribe(); - idlePayoutSyncer = null; // main wallet removes listener itself - } + }); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -1262,11 +1275,6 @@ public abstract class Trade implements Tradable, Model { public void onComplete() { } - public void onRemoved() { - HavenoUtils.removeThreadId(getId()); - HavenoUtils.removeThreadId(getConnectionChangedThreadId()); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Abstract @@ -1864,7 +1872,7 @@ public abstract class Trade implements Tradable, Model { // sync and reprocess messages on new thread if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { - HavenoUtils.submitToPool(() -> initSyncing()); + ThreadUtils.execute(() -> initSyncing(), getId()); } } } @@ -1875,11 +1883,9 @@ public abstract class Trade implements Tradable, Model { initSyncingAux(); } else { long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getWalletRefreshPeriod()); // random time to start syncing - UserThread.runAfter(() -> { - HavenoUtils.submitToPool(() -> { - if (!isShutDownStarted) initSyncingAux(); - }); - }, startSyncingInMs / 1000l); + UserThread.runAfter(() -> ThreadUtils.execute(() -> { + if (!isShutDownStarted) initSyncingAux(); + }, getId()), startSyncingInMs / 1000l); } } @@ -2108,13 +2114,11 @@ public abstract class Trade implements Tradable, Model { @Override public void onNewBlock(long height) { - HavenoUtils.submitToThread(() -> { // allow rapid notifications + ThreadUtils.execute(() -> { // allow rapid notifications // skip rapid succession blocks - synchronized (this) { - if (processing) return; - processing = true; - } + if (processing) return; + processing = true; // skip if not idling and not waiting for payout to unlock if (!isIdling() || !isPayoutPublished() || isPayoutUnlocked()) { diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 0a7e4134..0c722b92 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import common.utils.GenUtils; import haveno.common.ClockWatcher; +import haveno.common.ThreadUtils; import haveno.common.crypto.KeyRing; import haveno.common.crypto.PubKeyRing; import haveno.common.handlers.ErrorMessageHandler; @@ -234,7 +235,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); if (!(networkEnvelope instanceof TradeMessage)) return; String tradeId = ((TradeMessage) networkEnvelope).getTradeId(); - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { if (networkEnvelope instanceof InitTradeRequest) { handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof InitMultisigRequest) { @@ -315,10 +316,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); try { - HavenoUtils.awaitTasks(tasks); + ThreadUtils.awaitTasks(tasks); } catch (Exception e) { log.warn("Error notifying trades that shut down started: {}", e.getMessage()); - e.printStackTrace(); + throw e; } } @@ -345,7 +346,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); try { - HavenoUtils.awaitTasks(tasks); + ThreadUtils.awaitTasks(tasks); } catch (Exception e) { log.warn("Error shutting down trades: {}", e.getMessage()); e.printStackTrace(); @@ -413,7 +414,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); }; - HavenoUtils.awaitTasks(tasks, threadPoolSize); + ThreadUtils.awaitTasks(tasks, threadPoolSize); log.info("Done initializing persisted trades"); if (isShutDownStarted) return; @@ -422,7 +423,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // sync idle trades once in background after active trades for (Trade trade : trades) { - if (trade.isIdling()) HavenoUtils.submitToPool(() -> trade.syncAndPollWallet()); + if (trade.isIdling()) ThreadUtils.submitToPool(() -> trade.syncAndPollWallet()); } // process after all wallets initialized @@ -1224,7 +1225,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // remove trade tradableList.remove(trade); - trade.onRemoved(); // unregister and persist p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade)); diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java index f6a6e351..01017aff 100644 --- a/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java @@ -17,6 +17,7 @@ e * This file is part of Haveno. package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.core.trade.BuyerAsMakerTrade; import haveno.core.trade.Trade; @@ -43,7 +44,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.errorMessageHandler = errorMessageHandler; @@ -66,6 +67,6 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java index 962fc6fb..7baacee2 100644 --- a/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java @@ -18,6 +18,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.core.trade.BuyerAsTakerTrade; import haveno.core.trade.Trade; @@ -47,7 +48,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol public void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".onTakeOffer()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.tradeResultHandler = tradeResultHandler; @@ -71,6 +72,6 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java index 6ebd8021..c31d6453 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java @@ -18,6 +18,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.core.trade.SellerAsMakerTrade; import haveno.core.trade.Trade; @@ -48,7 +49,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.errorMessageHandler = errorMessageHandler; @@ -71,6 +72,6 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java index 98bfa7e6..a9898301 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java @@ -18,6 +18,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.core.trade.SellerAsTakerTrade; import haveno.core.trade.Trade; @@ -48,7 +49,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc public void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getSimpleName() + ".onTakeOffer()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.tradeResultHandler = tradeResultHandler; @@ -72,6 +73,6 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc .executeTasks(true); awaitTradeLatch(); } - }).start(); + }, trade.getId()); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index d9ca7a0a..0fc2b18f 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -17,6 +17,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.Timer; import haveno.common.UserThread; import haveno.common.crypto.PubKeyRing; @@ -113,12 +114,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); - HavenoUtils.submitToThread(() -> handle(message, peerNodeAddress), trade.getId()); + ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId()); } protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); - handle(message, peerNodeAddress); + ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId()); } private void handle(TradeMessage message, NodeAddress peerNodeAddress) { @@ -264,7 +265,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } public void maybeSendDepositsConfirmedMessages() { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return; synchronized (trade) { if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down @@ -285,7 +286,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { // skip if no need to reprocess @@ -296,190 +297,198 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); } - - handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); }, trade.getId()); } public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()"); - synchronized (trade) { + ThreadUtils.execute(() -> { + synchronized (trade) { - // check trade - if (trade.hasFailed()) { - log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), request.getClass().getSimpleName(), sender, trade.getErrorMessage()); - return; + // check trade + if (trade.hasFailed()) { + log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), request.getClass().getSimpleName(), sender, trade.getErrorMessage()); + return; + } + Validator.checkTradeId(processModel.getOfferId(), request); + + // process message + latchTrade(); + processModel.setTradeMessage(request); + expect(anyPhase(Trade.Phase.INIT) + .with(request) + .from(sender)) + .setup(tasks( + ProcessInitMultisigRequest.class, + MaybeSendSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); } - Validator.checkTradeId(processModel.getOfferId(), request); - - // proocess message - latchTrade(); - processModel.setTradeMessage(request); - expect(anyPhase(Trade.Phase.INIT) - .with(request) - .from(sender)) - .setup(tasks( - ProcessInitMultisigRequest.class, - MaybeSendSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + }, trade.getId()); } public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId()); - synchronized (trade) { + ThreadUtils.execute(() -> { + synchronized (trade) { - // check trade - if (trade.hasFailed()) { - log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage()); - return; - } - Validator.checkTradeId(processModel.getOfferId(), message); - - // process message - if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { - latchTrade(); + // check trade + if (trade.hasFailed()) { + log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage()); + return; + } Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - - // process sign contract request after multisig created - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.MULTISIG_COMPLETED) HavenoUtils.submitToThread(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock - }); + + // process message + if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); + expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + + // process sign contract request after multisig created + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.MULTISIG_COMPLETED) ThreadUtils.execute(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock + }); + } } - } + }, trade.getId()); } public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId()); - synchronized (trade) { + ThreadUtils.execute(() -> { + synchronized (trade) { - // check trade - if (trade.hasFailed()) { - log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage()); - return; - } - Validator.checkTradeId(processModel.getOfferId(), message); - - // process message - if (trade.getState() == Trade.State.CONTRACT_SIGNED) { - latchTrade(); + // check trade + if (trade.hasFailed()) { + log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage()); + return; + } Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(state(Trade.State.CONTRACT_SIGNED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractResponse.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - - // process sign contract response after contract signed - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.CONTRACT_SIGNED) HavenoUtils.submitToThread(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock - }); + + // process message + if (trade.getState() == Trade.State.CONTRACT_SIGNED) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); + expect(state(Trade.State.CONTRACT_SIGNED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractResponse.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + + // process sign contract response after contract signed + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.CONTRACT_SIGNED) ThreadUtils.execute(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock + }); + } } - } + }, trade.getId()); } public void handleDepositResponse(DepositResponse response, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleDepositResponse()"); - synchronized (trade) { + ThreadUtils.execute(() -> { + synchronized (trade) { - // check trade - if (trade.hasFailed()) { - log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), response.getClass().getSimpleName(), sender, trade.getErrorMessage()); - return; + // check trade + if (trade.hasFailed()) { + log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), response.getClass().getSimpleName(), sender, trade.getErrorMessage()); + return; + } + Validator.checkTradeId(processModel.getOfferId(), response); + + // process message + latchTrade(); + processModel.setTradeMessage(response); + expect(anyState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) + .with(response) + .from(sender)) + .setup(tasks( + ProcessDepositResponse.class, + RemoveOffer.class, + SellerPublishTradeStatistics.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED + handleTaskRunnerSuccess(sender, response); + if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized + }, + errorMessage -> { + handleTaskRunnerFault(sender, response, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); } - Validator.checkTradeId(processModel.getOfferId(), response); - - // process message - latchTrade(); - processModel.setTradeMessage(response); - expect(anyState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) - .with(response) - .from(sender)) - .setup(tasks( - ProcessDepositResponse.class, - RemoveOffer.class, - SellerPublishTradeStatistics.class) - .using(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED - handleTaskRunnerSuccess(sender, response); - if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized - }, - errorMessage -> { - handleTaskRunnerFault(sender, response, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + }, trade.getId()); } public void handle(DepositsConfirmedMessage response, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); - synchronized (trade) { - latchTrade(); - this.errorMessageHandler = null; - expect(new Condition(trade) - .with(response) - .from(sender)) - .setup(tasks( - ProcessDepositsConfirmedMessage.class, - VerifyPeersAccountAgeWitness.class, - MaybeResendDisputeClosedMessageWithPayout.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(sender, response); - }, - errorMessage -> { - handleTaskRunnerFault(sender, response, errorMessage); - }))) - .executeTasks(); - awaitTradeLatch(); - } + ThreadUtils.execute(() -> { + synchronized (trade) { + latchTrade(); + this.errorMessageHandler = null; + expect(new Condition(trade) + .with(response) + .from(sender)) + .setup(tasks( + ProcessDepositsConfirmedMessage.class, + VerifyPeersAccountAgeWitness.class, + MaybeResendDisputeClosedMessageWithPayout.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(sender, response); + }, + errorMessage -> { + handleTaskRunnerFault(sender, response, errorMessage); + }))) + .executeTasks(); + awaitTradeLatch(); + } + }, trade.getId()); } // received by seller and arbitrator @@ -489,43 +498,45 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D log.warn("Ignoring PaymentSentMessage since not seller or arbitrator"); return; } - // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case - // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received - // a mailbox message with PaymentSentMessage. - // TODO A better fix would be to add a listener for the wallet sync state and process - // the mailbox msg once wallet is ready and trade state set. - synchronized (trade) { - if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Received another PaymentSentMessage which was already processed, ACKing"); - handleTaskRunnerSuccess(peer, message); - return; + ThreadUtils.execute(() -> { + // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case + // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received + // a mailbox message with PaymentSentMessage. + // TODO A better fix would be to add a listener for the wallet sync state and process + // the mailbox msg once wallet is ready and trade state set. + synchronized (trade) { + if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Received another PaymentSentMessage which was already processed, ACKing"); + handleTaskRunnerSuccess(peer, message); + return; + } + if (trade.getPayoutTx() != null) { + log.warn("We received a PaymentSentMessage but we have already created the payout tx " + + "so we ignore the message. This can happen if the ACK message to the peer did not " + + "arrive and the peer repeats sending us the message. We send another ACK msg."); + sendAckMessage(peer, message, true, null); + removeMailboxMessageAfterProcessing(message); + return; + } + latchTrade(); + expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) + .with(message) + .from(peer)) + .setup(tasks( + ApplyFilter.class, + ProcessPaymentSentMessage.class, + VerifyPeersAccountAgeWitness.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + (errorMessage) -> { + handleTaskRunnerFault(peer, message, errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); } - if (trade.getPayoutTx() != null) { - log.warn("We received a PaymentSentMessage but we have already created the payout tx " + - "so we ignore the message. This can happen if the ACK message to the peer did not " + - "arrive and the peer repeats sending us the message. We send another ACK msg."); - sendAckMessage(peer, message, true, null); - removeMailboxMessageAfterProcessing(message); - return; - } - latchTrade(); - expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) - .with(message) - .from(peer)) - .setup(tasks( - ApplyFilter.class, - ProcessPaymentSentMessage.class, - VerifyPeersAccountAgeWitness.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - (errorMessage) -> { - handleTaskRunnerFault(peer, message, errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } + }, trade.getId()); } // received by buyer and arbitrator @@ -535,56 +546,58 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D private void handle(PaymentReceivedMessage message, NodeAddress peer, boolean reprocessOnError) { System.out.println(getClass().getSimpleName() + ".handle(PaymentReceivedMessage)"); - if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) { - log.warn("Ignoring PaymentReceivedMessage since not buyer or arbitrator"); - return; - } - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - - // check minimum trade phase - if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); - return; - } - if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) { - log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); - return; - } - if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) { - log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + ThreadUtils.execute(() -> { + if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) { + log.warn("Ignoring PaymentReceivedMessage since not buyer or arbitrator"); return; } + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); - expect(anyPhase() - .with(message) - .from(peer)) - .setup(tasks( - ProcessPaymentReceivedMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - log.warn("Error processing payment received message: " + errorMessage); - processModel.getTradeManager().requestPersistence(); + // check minimum trade phase + if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } + if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) { + log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } + if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) { + log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } - // schedule to reprocess message unless deleted - if (trade.getSeller().getPaymentReceivedMessage() != null) { - UserThread.runAfter(() -> { - reprocessPaymentReceivedMessageCount++; - maybeReprocessPaymentReceivedMessage(reprocessOnError); - }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount)); - } else { - handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack - } - unlatchTrade(); - }))) - .executeTasks(true); - awaitTradeLatch(); - } + expect(anyPhase() + .with(message) + .from(peer)) + .setup(tasks( + ProcessPaymentReceivedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + log.warn("Error processing payment received message: " + errorMessage); + processModel.getTradeManager().requestPersistence(); + + // schedule to reprocess message unless deleted + if (trade.getSeller().getPaymentReceivedMessage() != null) { + UserThread.runAfter(() -> { + reprocessPaymentReceivedMessageCount++; + maybeReprocessPaymentReceivedMessage(reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount)); + } else { + handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack + } + unlatchTrade(); + }))) + .executeTasks(true); + awaitTradeLatch(); + } + }, trade.getId()); } public void onWithdrawCompleted() { diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java index f68432f5..e75fdb0e 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java +++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.Service.State; import com.google.inject.name.Named; import common.utils.JsonUtils; +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.common.config.Config; import haveno.common.file.FileUtil; @@ -707,7 +708,11 @@ public class XmrWalletService { public void onShutDownStarted() { log.info("XmrWalletService.onShutDownStarted()"); this.isShutDownStarted = true; + } + public void shutDown() { + log.info("Shutting down {}", getClass().getSimpleName()); + // remove listeners which stops polling wallet // TODO monero-java: wallet.stopPolling()? synchronized (walletLock) { @@ -717,15 +722,11 @@ public class XmrWalletService { } } } - } - - public void shutDown() { - log.info("Shutting down {}", getClass().getSimpleName()); // shut down trade and main wallets at same time walletListeners.clear(); closeMainWallet(true); - log.info("Done shutting down all wallets"); + log.info("Done shutting down main wallet"); } // ------------------------------ PRIVATE HELPERS ------------------------- @@ -734,7 +735,7 @@ public class XmrWalletService { // listen for connection changes xmrConnectionService.addConnectionListener(connection -> { - HavenoUtils.submitToThread(() -> onConnectionChanged(connection), THREAD_ID); + ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID); }); // initialize main wallet when daemon synced @@ -744,7 +745,7 @@ public class XmrWalletService { } private void initMainWalletIfConnected() { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { synchronized (walletLock) { if (xmrConnectionService.downloadPercentageProperty().get() == 1 && wallet == null && !isShutDownStarted) { maybeInitMainWallet(true); @@ -817,12 +818,12 @@ public class XmrWalletService { // reschedule to init main wallet UserThread.runAfter(() -> { - HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID); + ThreadUtils.execute(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID); }, xmrConnectionService.getRefreshPeriodMs() / 1000); } else { log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000); UserThread.runAfter(() -> { - HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID); + ThreadUtils.execute(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID); }, xmrConnectionService.getRefreshPeriodMs() / 1000); } } @@ -994,7 +995,7 @@ public class XmrWalletService { // sync wallet on new thread if (connection != null) { wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE); - HavenoUtils.submitToPool(() -> { + ThreadUtils.submitToPool(() -> { if (isShutDownStarted) return; wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs()); try { @@ -1034,7 +1035,7 @@ public class XmrWalletService { } // excute tasks in parallel - HavenoUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size())); + ThreadUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size())); log.info("Done changing all wallet passwords"); } @@ -1318,7 +1319,7 @@ public class XmrWalletService { BigInteger balance; if (balanceListener.getSubaddressIndex() != null && balanceListener.getSubaddressIndex() != 0) balance = getBalanceForSubaddress(balanceListener.getSubaddressIndex()); else balance = getAvailableBalance(); - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { try { balanceListener.onBalanceChanged(balance); } catch (Exception e) { @@ -1369,14 +1370,14 @@ public class XmrWalletService { @Override public void onSyncProgress(long height, long startHeight, long endHeight, double percentDone, String message) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message); }, THREAD_ID); } @Override public void onNewBlock(long height) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { walletHeight.set(height); for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height); }, THREAD_ID); @@ -1384,7 +1385,7 @@ public class XmrWalletService { @Override public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance); updateBalanceListeners(); }, THREAD_ID); @@ -1392,14 +1393,14 @@ public class XmrWalletService { @Override public void onOutputReceived(MoneroOutputWallet output) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output); }, THREAD_ID); } @Override public void onOutputSpent(MoneroOutputWallet output) { - HavenoUtils.submitToThread(() -> { + ThreadUtils.execute(() -> { for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output); }, THREAD_ID); } diff --git a/p2p/src/main/java/haveno/network/p2p/network/Connection.java b/p2p/src/main/java/haveno/network/p2p/network/Connection.java index a154f34a..1eaa0e6a 100644 --- a/p2p/src/main/java/haveno/network/p2p/network/Connection.java +++ b/p2p/src/main/java/haveno/network/p2p/network/Connection.java @@ -32,6 +32,7 @@ import haveno.network.p2p.storage.payload.CapabilityRequiringPayload; import haveno.network.p2p.storage.payload.PersistableNetworkPayload; import haveno.common.Proto; +import haveno.common.ThreadUtils; import haveno.common.app.Capabilities; import haveno.common.app.HasCapabilities; import haveno.common.app.Version; @@ -73,7 +74,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -109,7 +109,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { //TODO decrease limits again after testing private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240); private static final int SHUTDOWN_TIMEOUT = 100; - private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); // one shared thread to handle messages sequentially + private static final String THREAD_ID = Connection.class.getSimpleName(); public static int getPermittedMessageSize() { return PERMITTED_MESSAGE_SIZE; @@ -212,7 +212,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { reportInvalidRequest(RuleViolation.PEER_BANNED); } } - EXECUTOR.execute(() -> connectionListener.onConnection(this)); + ThreadUtils.execute(() -> connectionListener.onConnection(this), THREAD_ID); } catch (Throwable e) { handleException(e); } @@ -266,8 +266,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { if (!stopped) { protoOutputStream.writeEnvelope(networkEnvelope); - EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this))); - EXECUTOR.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize)); + ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)), THREAD_ID); + ThreadUtils.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize), THREAD_ID); } } catch (Throwable t) { handleException(t); @@ -396,7 +396,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { if (networkEnvelope instanceof BundleOfEnvelopes) { onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection); } else { - EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); + ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)), THREAD_ID); } } @@ -432,8 +432,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { envelopesToProcess.add(networkEnvelope); } } - envelopesToProcess.forEach(envelope -> EXECUTOR.execute(() -> - messageListeners.forEach(listener -> listener.onMessage(envelope, connection)))); + envelopesToProcess.forEach(envelope -> ThreadUtils.execute(() -> { + messageListeners.forEach(listener -> listener.onMessage(envelope, connection)); + }, THREAD_ID)); } @@ -503,7 +504,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { t.printStackTrace(); } finally { stopped = true; - EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler)); + ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID); } }, "Connection:SendCloseConnectionMessage-" + this.uid).start(); } else { @@ -513,12 +514,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { } else { //TODO find out why we get called that log.debug("stopped was already at shutDown call"); - EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler)); + ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID); } } private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) { - EXECUTOR.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this)); + ThreadUtils.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this), THREAD_ID); try { protoOutputStream.onConnectionShutdown(); socket.close(); @@ -541,7 +542,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { log.debug("Connection shutdown complete {}", this); if (shutDownCompleteHandler != null) - EXECUTOR.execute(shutDownCompleteHandler); + ThreadUtils.execute(shutDownCompleteHandler, THREAD_ID); } } @@ -844,8 +845,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { log.info("We got a {} from a peer with yet unknown address on connection with uid={}", networkEnvelope.getClass().getSimpleName(), uid); } - EXECUTOR.execute(() -> onMessage(networkEnvelope, this)); - EXECUTOR.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size)); + ThreadUtils.execute(() -> onMessage(networkEnvelope, this), THREAD_ID); + ThreadUtils.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID); } } catch (InvalidClassException e) { log.error(e.getMessage()); @@ -894,7 +895,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { capabilitiesListeners.forEach(weakListener -> { SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get(); if (supportedCapabilitiesListener != null) { - EXECUTOR.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities)); + ThreadUtils.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities), THREAD_ID); } }); return false;