diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java
new file mode 100644
index 00000000..c4cb5c4b
--- /dev/null
+++ b/common/src/main/java/haveno/common/ThreadUtils.java
@@ -0,0 +1,144 @@
+/*
+ * This file is part of Haveno.
+ *
+ * Haveno is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * Haveno is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with Haveno. If not, see .
+ */
+
+package haveno.common;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class ThreadUtils {
+
+ private static final Map EXECUTORS = new HashMap<>();
+ private static final Map THREAD_BY_ID = new HashMap<>();
+ private static final int POOL_SIZE = 10;
+ private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
+
+
+ public static void execute(Runnable command, String threadId) {
+ synchronized (EXECUTORS) {
+ if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1));
+ EXECUTORS.get(threadId).execute(() -> {
+ synchronized (THREAD_BY_ID) {
+ THREAD_BY_ID.put(threadId, Thread.currentThread());
+ }
+ command.run();
+ });
+ }
+ }
+
+ public static void await(Runnable command, String threadId) {
+ if (isCurrentThread(Thread.currentThread(), threadId)) {
+ command.run();
+ } else {
+ CountDownLatch latch = new CountDownLatch(1);
+ execute(command, threadId); // run task
+ execute(() -> latch.countDown(), threadId); // await next tick
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static void shutDown(String threadId, long timeoutMs) {
+ ExecutorService pool = null;
+ synchronized (EXECUTORS) {
+ if (!EXECUTORS.containsKey(threadId)) return; // thread not found
+ pool = EXECUTORS.get(threadId);
+ }
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow();
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ throw new RuntimeException(e);
+ } finally {
+ synchronized (EXECUTORS) {
+ EXECUTORS.remove(threadId);
+ }
+ synchronized (THREAD_BY_ID) {
+ THREAD_BY_ID.remove(threadId);
+ }
+ }
+ }
+
+ public static Future> submitToPool(Runnable task) {
+ return submitToPool(Arrays.asList(task)).get(0);
+ }
+
+ public static List> submitToPool(List tasks) {
+ List> futures = new ArrayList<>();
+ for (Runnable task : tasks) futures.add(POOL.submit(task));
+ return futures;
+ }
+
+ // TODO: these are unused; remove? use monero-java awaitTasks() when updated
+
+ public static Future> awaitTask(Runnable task) {
+ return awaitTasks(Arrays.asList(task)).get(0);
+ }
+
+ public static List> awaitTasks(Collection tasks) {
+ return awaitTasks(tasks, tasks.size());
+ }
+
+ public static List> awaitTasks(Collection tasks, int maxConcurrency) {
+ return awaitTasks(tasks, maxConcurrency, null);
+ }
+
+ public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) {
+ List> futures = new ArrayList<>();
+ if (tasks.isEmpty()) return futures;
+ ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency);
+ for (Runnable task : tasks) futures.add(pool.submit(task));
+ pool.shutdown();
+
+ // interrupt after timeout
+ if (timeoutSeconds != null) {
+ try {
+ if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow();
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ throw new RuntimeException(e);
+ }
+ }
+
+ // throw exception from any tasks
+ try {
+ for (Future> future : futures) future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return futures;
+ }
+
+ private static boolean isCurrentThread(Thread thread, String threadId) {
+ synchronized (THREAD_BY_ID) {
+ if (!THREAD_BY_ID.containsKey(threadId)) return false;
+ return thread == THREAD_BY_ID.get(threadId);
+ }
+ }
+}
diff --git a/core/src/main/java/haveno/core/api/XmrConnectionService.java b/core/src/main/java/haveno/core/api/XmrConnectionService.java
index 66b8b846..65a4eaf1 100644
--- a/core/src/main/java/haveno/core/api/XmrConnectionService.java
+++ b/core/src/main/java/haveno/core/api/XmrConnectionService.java
@@ -1,5 +1,6 @@
package haveno.core.api;
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.DevEnv;
import haveno.common.config.BaseCurrencyNetwork;
@@ -562,7 +563,7 @@ public final class XmrConnectionService {
// notify listeners in parallel
synchronized (listenerLock) {
for (MoneroConnectionManagerListener listener : listeners) {
- HavenoUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection));
+ ThreadUtils.submitToPool(() -> listener.onConnectionChanged(currentConnection));
}
}
}
diff --git a/core/src/main/java/haveno/core/app/HavenoExecutable.java b/core/src/main/java/haveno/core/app/HavenoExecutable.java
index 1a7434bc..cbbc9b31 100644
--- a/core/src/main/java/haveno/core/app/HavenoExecutable.java
+++ b/core/src/main/java/haveno/core/app/HavenoExecutable.java
@@ -19,6 +19,8 @@ package haveno.core.app;
import com.google.inject.Guice;
import com.google.inject.Injector;
+
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.AppModule;
import haveno.common.config.Config;
@@ -41,7 +43,6 @@ import haveno.core.provider.price.PriceFeedService;
import haveno.core.setup.CorePersistedDataHost;
import haveno.core.setup.CoreSetup;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
-import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
@@ -340,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
- HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
+ ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java
index 0fe88114..ea44c0b3 100644
--- a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java
+++ b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java
@@ -18,6 +18,8 @@
package haveno.core.app.misc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.DevEnv;
import haveno.common.config.Config;
@@ -33,7 +35,6 @@ import haveno.core.offer.OfferBookService;
import haveno.core.offer.OpenOfferManager;
import haveno.core.provider.price.PriceFeedService;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
-import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
@@ -103,7 +104,7 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable {
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
- HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
+ ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java
index 27bf6fd9..1de7fbcd 100644
--- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java
+++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java
@@ -18,6 +18,7 @@
package haveno.core.offer;
import common.utils.GenUtils;
+import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Capabilities;
@@ -150,6 +151,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// poll key images of signed offers
private XmrKeyImagePoller signedOfferKeyImagePoller;
+ private static final long SHUTDOWN_TIMEOUT_MS = 90000;
private static final long KEY_IMAGE_REFRESH_PERIOD_MS_LOCAL = 20000; // 20 seconds
private static final long KEY_IMAGE_REFRESH_PERIOD_MS_REMOTE = 300000; // 5 minutes
@@ -301,7 +303,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
public void shutDown(@Nullable Runnable completeHandler) {
- HavenoUtils.removeThreadId(THREAD_ID);
stopped = true;
p2PService.getPeerManager().removeListener(this);
p2PService.removeDecryptedDirectMessageListener(this);
@@ -316,7 +317,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
int size = openOffers.size();
log.info("Remove open offers at shutDown. Number of open offers: {}", size);
if (offerBookService.isBootstrapped() && size > 0) {
- HavenoUtils.submitToThread(() -> { // finish tasks
+ ThreadUtils.execute(() -> { // finish tasks
UserThread.execute(() -> {
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()));
@@ -337,6 +338,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (completeHandler != null)
completeHandler.run();
}
+
+ // shut down pool
+ ThreadUtils.shutDown(THREAD_ID, SHUTDOWN_TIMEOUT_MS);
}
public void removeAllOpenOffers(@Nullable Runnable completeHandler) {
@@ -400,7 +404,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
maybeUpdatePersistedOffers();
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
// Wait for prices to be available
priceFeedService.awaitExternalPrices();
@@ -506,7 +510,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount);
// schedule or post offer
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> {
@@ -807,7 +811,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
ErrorMessageHandler errorMessageHandler) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (processOffersLock) {
List errorMessages = new ArrayList();
List openOffers = getOpenOffers();
@@ -1571,7 +1575,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer();
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
processListForRepublishOffers(getOpenOffers());
}, THREAD_ID);
}
@@ -1607,7 +1611,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
// determine if offer is valid
boolean isValid = true;
diff --git a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java
index 256ad8c4..9d81758b 100644
--- a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java
+++ b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java
@@ -17,6 +17,7 @@
package haveno.core.support.dispute;
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.app.Version;
import haveno.common.config.Config;
@@ -447,106 +448,108 @@ public abstract class DisputeManager> extends Sup
return;
}
- synchronized (trade) {
- String errorMessage = null;
- PubKeyRing senderPubKeyRing = null;
- try {
-
- // initialize
- T disputeList = getDisputeList();
- if (disputeList == null) {
- log.warn("disputes is null");
- return;
- }
- dispute.setSupportType(message.getSupportType());
- dispute.setState(Dispute.State.NEW);
- Contract contract = dispute.getContract();
-
- // validate dispute
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
+ String errorMessage = null;
+ PubKeyRing senderPubKeyRing = null;
try {
- DisputeValidation.validateDisputeData(dispute);
- DisputeValidation.validateNodeAddresses(dispute, config);
- DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress());
- //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList());
- } catch (DisputeValidation.ValidationException e) {
- e.printStackTrace();
- validationExceptions.add(e);
- throw e;
- }
- // try to validate payment account
- // TODO: add field to dispute details: valid, invalid, missing
- try {
- DisputeValidation.validatePaymentAccountPayload(dispute);
+ // initialize
+ T disputeList = getDisputeList();
+ if (disputeList == null) {
+ log.warn("disputes is null");
+ return;
+ }
+ dispute.setSupportType(message.getSupportType());
+ dispute.setState(Dispute.State.NEW);
+ Contract contract = dispute.getContract();
+
+ // validate dispute
+ try {
+ DisputeValidation.validateDisputeData(dispute);
+ DisputeValidation.validateNodeAddresses(dispute, config);
+ DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress());
+ //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList());
+ } catch (DisputeValidation.ValidationException e) {
+ e.printStackTrace();
+ validationExceptions.add(e);
+ throw e;
+ }
+
+ // try to validate payment account
+ // TODO: add field to dispute details: valid, invalid, missing
+ try {
+ DisputeValidation.validatePaymentAccountPayload(dispute);
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.warn(e.getMessage());
+ trade.prependErrorMessage(e.getMessage());
+ }
+
+ // get sender
+ senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
+ TradePeer sender = trade.getTradePeer(senderPubKeyRing);
+ if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
+
+ // message to trader is expected from arbitrator
+ if (!trade.isArbitrator() && sender != trade.getArbitrator()) {
+ throw new RuntimeException(message.getClass().getSimpleName() + " to trader is expected only from arbitrator");
+ }
+
+ // arbitrator verifies signature of payment sent message if given
+ if (trade.isArbitrator() && message.getPaymentSentMessage() != null) {
+ HavenoUtils.verifyPaymentSentMessage(trade, message.getPaymentSentMessage());
+ trade.getBuyer().setUpdatedMultisigHex(message.getPaymentSentMessage().getUpdatedMultisigHex());
+ trade.advanceState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
+ }
+
+ // update multisig hex
+ if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex());
+ if (trade.walletExists()) trade.importMultisigHex();
+
+ // add chat message with price info
+ if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0);
+
+ // add dispute
+ synchronized (disputeList) {
+ if (!disputeList.contains(dispute)) {
+ Optional storedDisputeOptional = findDispute(dispute);
+ if (!storedDisputeOptional.isPresent()) {
+ disputeList.add(dispute);
+ trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
+
+ // send dispute opened message to peer if arbitrator
+ if (trade.isArbitrator()) sendDisputeOpenedMessageToPeer(dispute, contract, dispute.isDisputeOpenerIsBuyer() ? contract.getSellerPubKeyRing() : contract.getBuyerPubKeyRing(), trade.getSelf().getUpdatedMultisigHex());
+ tradeManager.requestPersistence();
+ errorMessage = null;
+ } else {
+ // valid case if both have opened a dispute and agent was not online
+ log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId());
+ }
+
+ // add chat message with mediation info if applicable
+ addMediationResultMessage(dispute);
+ } else {
+ throw new RuntimeException("We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId());
+ }
+ }
} catch (Exception e) {
e.printStackTrace();
- log.warn(e.getMessage());
- trade.prependErrorMessage(e.getMessage());
+ errorMessage = e.getMessage();
+ log.warn(errorMessage);
+ if (trade != null) trade.setErrorMessage(errorMessage);
}
- // get sender
- senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
- TradePeer sender = trade.getTradePeer(senderPubKeyRing);
- if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
-
- // message to trader is expected from arbitrator
- if (!trade.isArbitrator() && sender != trade.getArbitrator()) {
- throw new RuntimeException(message.getClass().getSimpleName() + " to trader is expected only from arbitrator");
+ // use chat message instead of open dispute message for the ack
+ ObservableList messages = message.getDispute().getChatMessages();
+ if (!messages.isEmpty()) {
+ ChatMessage msg = messages.get(0);
+ sendAckMessage(msg, senderPubKeyRing, errorMessage == null, errorMessage);
}
- // arbitrator verifies signature of payment sent message if given
- if (trade.isArbitrator() && message.getPaymentSentMessage() != null) {
- HavenoUtils.verifyPaymentSentMessage(trade, message.getPaymentSentMessage());
- trade.getBuyer().setUpdatedMultisigHex(message.getPaymentSentMessage().getUpdatedMultisigHex());
- trade.advanceState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
- }
-
- // update multisig hex
- if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex());
- if (trade.walletExists()) trade.importMultisigHex();
-
- // add chat message with price info
- if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0);
-
- // add dispute
- synchronized (disputeList) {
- if (!disputeList.contains(dispute)) {
- Optional storedDisputeOptional = findDispute(dispute);
- if (!storedDisputeOptional.isPresent()) {
- disputeList.add(dispute);
- trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
-
- // send dispute opened message to peer if arbitrator
- if (trade.isArbitrator()) sendDisputeOpenedMessageToPeer(dispute, contract, dispute.isDisputeOpenerIsBuyer() ? contract.getSellerPubKeyRing() : contract.getBuyerPubKeyRing(), trade.getSelf().getUpdatedMultisigHex());
- tradeManager.requestPersistence();
- errorMessage = null;
- } else {
- // valid case if both have opened a dispute and agent was not online
- log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId());
- }
-
- // add chat message with mediation info if applicable
- addMediationResultMessage(dispute);
- } else {
- throw new RuntimeException("We got a dispute msg that we have already stored. TradeId = " + dispute.getTradeId());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- errorMessage = e.getMessage();
- log.warn(errorMessage);
- if (trade != null) trade.setErrorMessage(errorMessage);
+ requestPersistence();
}
-
- // use chat message instead of open dispute message for the ack
- ObservableList messages = message.getDispute().getChatMessages();
- if (!messages.isEmpty()) {
- ChatMessage msg = messages.get(0);
- sendAckMessage(msg, senderPubKeyRing, errorMessage == null, errorMessage);
- }
-
- requestPersistence();
- }
+ }, trade.getId());
}
// arbitrator sends dispute opened message to opener's peer
diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java
index 4802d7c8..e0b25185 100644
--- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java
+++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java
@@ -20,6 +20,7 @@ package haveno.core.support.dispute.arbitration;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import common.utils.GenUtils;
+import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.app.Version;
@@ -116,7 +117,7 @@ public final class ArbitrationManager extends DisputeManager {
+ ThreadUtils.execute(() -> {
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
@@ -187,144 +188,146 @@ public final class ArbitrationManager extends DisputeManager {
+ ChatMessage chatMessage = null;
+ Dispute dispute = null;
+ synchronized (trade) {
+ try {
+ DisputeResult disputeResult = disputeClosedMessage.getDisputeResult();
+ chatMessage = disputeResult.getChatMessage();
+ checkNotNull(chatMessage, "chatMessage must not be null");
+ String tradeId = disputeResult.getTradeId();
- log.info("Processing {} for {} {}", disputeClosedMessage.getClass().getSimpleName(), trade.getClass().getSimpleName(), disputeResult.getTradeId());
+ log.info("Processing {} for {} {}", disputeClosedMessage.getClass().getSimpleName(), trade.getClass().getSimpleName(), disputeResult.getTradeId());
- // get dispute
- Optional disputeOptional = findDispute(disputeResult);
- String uid = disputeClosedMessage.getUid();
- if (!disputeOptional.isPresent()) {
- log.warn("We got a dispute closed msg but we don't have a matching dispute. " +
- "That might happen when we get the DisputeClosedMessage before the dispute was created. " +
- "We try again after 2 sec. to apply the DisputeClosedMessage. TradeId = " + tradeId);
- if (!delayMsgMap.containsKey(uid)) {
- // We delay 2 sec. to be sure the comm. msg gets added first
- Timer timer = UserThread.runAfter(() -> handleDisputeClosedMessage(disputeClosedMessage), 2);
- delayMsgMap.put(uid, timer);
- } else {
- log.warn("We got a dispute closed msg after we already repeated to apply the message after a delay. " +
- "That should never happen. TradeId = " + tradeId);
- }
- return;
- }
- dispute = disputeOptional.get();
-
- // verify arbitrator signature
- String summaryText = chatMessage.getMessage();
- if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId()));
- if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring
- else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered)
-
- // save dispute closed message for reprocessing
- trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage);
- requestPersistence();
-
- // verify arbitrator does not receive DisputeClosedMessage
- if (keyRing.getPubKeyRing().equals(dispute.getAgentPubKeyRing())) {
- log.error("Arbitrator received disputeResultMessage. That should never happen.");
- trade.getArbitrator().setDisputeClosedMessage(null); // don't reprocess
- return;
- }
-
- // set dispute state
- cleanupRetryMap(uid);
- if (!dispute.getChatMessages().contains(chatMessage)) {
- dispute.addAndPersistChatMessage(chatMessage);
- } else {
- log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId());
- }
- dispute.setIsClosed();
- if (dispute.disputeResultProperty().get() != null) {
- log.info("We already got a dispute result, indicating the message was resent after updating multisig info. TradeId = " + tradeId);
- }
- dispute.setDisputeResult(disputeResult);
-
- // sync and save wallet
- if (!trade.isPayoutPublished()) {
- trade.syncAndPollWallet();
- trade.saveWallet();
- }
-
- // update multisig hex
- if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex());
- if (trade.walletExists()) trade.importMultisigHex();
-
- // attempt to sign and publish dispute payout tx if given and not already published
- if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) {
-
- // wait to sign and publish payout tx if defer flag set
- if (disputeClosedMessage.isDeferPublishPayout()) {
- log.info("Deferring signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
- for (int i = 0; i < 5; i++) {
- if (trade.isPayoutPublished()) break;
- GenUtils.waitFor(Trade.DEFER_PUBLISH_MS / 5);
+ // get dispute
+ Optional disputeOptional = findDispute(disputeResult);
+ String uid = disputeClosedMessage.getUid();
+ if (!disputeOptional.isPresent()) {
+ log.warn("We got a dispute closed msg but we don't have a matching dispute. " +
+ "That might happen when we get the DisputeClosedMessage before the dispute was created. " +
+ "We try again after 2 sec. to apply the DisputeClosedMessage. TradeId = " + tradeId);
+ if (!delayMsgMap.containsKey(uid)) {
+ // We delay 2 sec. to be sure the comm. msg gets added first
+ Timer timer = UserThread.runAfter(() -> handleDisputeClosedMessage(disputeClosedMessage), 2);
+ delayMsgMap.put(uid, timer);
+ } else {
+ log.warn("We got a dispute closed msg after we already repeated to apply the message after a delay. " +
+ "That should never happen. TradeId = " + tradeId);
}
- if (!trade.isPayoutPublished()) trade.syncAndPollWallet();
+ return;
+ }
+ dispute = disputeOptional.get();
+
+ // verify arbitrator signature
+ String summaryText = chatMessage.getMessage();
+ if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId()));
+ if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring
+ else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered)
+
+ // save dispute closed message for reprocessing
+ trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage);
+ requestPersistence();
+
+ // verify arbitrator does not receive DisputeClosedMessage
+ if (keyRing.getPubKeyRing().equals(dispute.getAgentPubKeyRing())) {
+ log.error("Arbitrator received disputeResultMessage. That should never happen.");
+ trade.getArbitrator().setDisputeClosedMessage(null); // don't reprocess
+ return;
}
- // sign and publish dispute payout tx if peer still has not published
- if (trade.isPayoutPublished()) {
- log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ // set dispute state
+ cleanupRetryMap(uid);
+ if (!dispute.getChatMessages().contains(chatMessage)) {
+ dispute.addAndPersistChatMessage(chatMessage);
} else {
- try {
- log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
- signAndPublishDisputePayoutTx(trade);
- } catch (Exception e) {
+ log.warn("We got a dispute mail msg that we have already stored. TradeId = " + chatMessage.getTradeId());
+ }
+ dispute.setIsClosed();
+ if (dispute.disputeResultProperty().get() != null) {
+ log.info("We already got a dispute result, indicating the message was resent after updating multisig info. TradeId = " + tradeId);
+ }
+ dispute.setDisputeResult(disputeResult);
- // check if payout published again
- trade.syncAndPollWallet();
- if (trade.isPayoutPublished()) {
- log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
- } else {
- throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId);
+ // sync and save wallet
+ if (!trade.isPayoutPublished()) {
+ trade.syncAndPollWallet();
+ trade.saveWallet();
+ }
+
+ // update multisig hex
+ if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex());
+ if (trade.walletExists()) trade.importMultisigHex();
+
+ // attempt to sign and publish dispute payout tx if given and not already published
+ if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) {
+
+ // wait to sign and publish payout tx if defer flag set
+ if (disputeClosedMessage.isDeferPublishPayout()) {
+ log.info("Deferring signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ for (int i = 0; i < 5; i++) {
+ if (trade.isPayoutPublished()) break;
+ GenUtils.waitFor(Trade.DEFER_PUBLISH_MS / 5);
+ }
+ if (!trade.isPayoutPublished()) trade.syncAndPollWallet();
+ }
+
+ // sign and publish dispute payout tx if peer still has not published
+ if (trade.isPayoutPublished()) {
+ log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ } else {
+ try {
+ log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ signAndPublishDisputePayoutTx(trade);
+ } catch (Exception e) {
+
+ // check if payout published again
+ trade.syncAndPollWallet();
+ if (trade.isPayoutPublished()) {
+ log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ } else {
+ throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId);
+ }
}
}
+ } else {
+ if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
+ else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId());
}
- } else {
- if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
- else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId());
- }
- // complete disputed trade
- if (trade.isPayoutPublished()) {
- tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED);
- }
+ // complete disputed trade
+ if (trade.isPayoutPublished()) {
+ tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED);
+ }
- // We use the chatMessage as we only persist those not the DisputeClosedMessage.
- // If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage.
- sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null);
- requestPersistence();
- } catch (Exception e) {
- log.warn("Error processing dispute closed message: " + e.getMessage());
- e.printStackTrace();
- requestPersistence();
-
- // nack bad message and do not reprocess
- if (e instanceof IllegalArgumentException) {
- trade.getArbitrator().setDisputeClosedMessage(null); // message is processed
- sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), false, e.getMessage());
+ // We use the chatMessage as we only persist those not the DisputeClosedMessage.
+ // If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage.
+ sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null);
+ requestPersistence();
+ } catch (Exception e) {
+ log.warn("Error processing dispute closed message: " + e.getMessage());
+ e.printStackTrace();
requestPersistence();
- throw e;
- }
- // schedule to reprocess message unless deleted
- if (trade.getArbitrator().getDisputeClosedMessage() != null) {
- if (!reprocessDisputeClosedMessageCounts.containsKey(trade.getId())) reprocessDisputeClosedMessageCounts.put(trade.getId(), 0);
- UserThread.runAfter(() -> {
- reprocessDisputeClosedMessageCounts.put(trade.getId(), reprocessDisputeClosedMessageCounts.get(trade.getId()) + 1); // increment reprocess count
- maybeReprocessDisputeClosedMessage(trade, reprocessOnError);
- }, trade.getReprocessDelayInSeconds(reprocessDisputeClosedMessageCounts.get(trade.getId())));
+ // nack bad message and do not reprocess
+ if (e instanceof IllegalArgumentException) {
+ trade.getArbitrator().setDisputeClosedMessage(null); // message is processed
+ sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), false, e.getMessage());
+ requestPersistence();
+ throw e;
+ }
+
+ // schedule to reprocess message unless deleted
+ if (trade.getArbitrator().getDisputeClosedMessage() != null) {
+ if (!reprocessDisputeClosedMessageCounts.containsKey(trade.getId())) reprocessDisputeClosedMessageCounts.put(trade.getId(), 0);
+ UserThread.runAfter(() -> {
+ reprocessDisputeClosedMessageCounts.put(trade.getId(), reprocessDisputeClosedMessageCounts.get(trade.getId()) + 1); // increment reprocess count
+ maybeReprocessDisputeClosedMessage(trade, reprocessOnError);
+ }, trade.getReprocessDelayInSeconds(reprocessDisputeClosedMessageCounts.get(trade.getId())));
+ }
}
}
- }
+ }, trade.getId());
}
public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) {
diff --git a/core/src/main/java/haveno/core/trade/HavenoUtils.java b/core/src/main/java/haveno/core/trade/HavenoUtils.java
index 360c3730..46134a99 100644
--- a/core/src/main/java/haveno/core/trade/HavenoUtils.java
+++ b/core/src/main/java/haveno/core/trade/HavenoUtils.java
@@ -44,18 +44,8 @@ import java.security.PrivateKey;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import monero.common.MoneroRpcConnection;
@@ -79,9 +69,6 @@ public class HavenoUtils {
private static final BigInteger XMR_AU_MULTIPLIER = new BigInteger("1000000000000");
public static final DecimalFormat XMR_FORMATTER = new DecimalFormat("##############0.000000000000", DECIMAL_FORMAT_SYMBOLS);
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
- private static final int POOL_SIZE = 10;
- private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
- private static final Map POOLS = new HashMap<>();
// TODO: better way to share references?
public static ArbitrationManager arbitrationManager;
@@ -474,86 +461,6 @@ public class HavenoUtils {
}
}
- public static Future> submitToPool(Runnable task) {
- return submitToPool(Arrays.asList(task)).get(0);
- }
-
- public static List> submitToPool(List tasks) {
- List> futures = new ArrayList<>();
- for (Runnable task : tasks) futures.add(POOL.submit(task));
- return futures;
- }
-
- public static Future> submitToSharedThread(Runnable task) {
- return submitToThread(task, HavenoUtils.class.getSimpleName());
- }
-
- public static Future> submitToThread(Runnable task, String threadId) {
- synchronized (POOLS) {
- if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1));
- return POOLS.get(threadId).submit(task);
- }
- }
-
- public static Future> awaitThread(Runnable task, String threadId) {
- Future> future = submitToThread(task, threadId);
- try {
- future.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return future;
- }
-
- public static void removeThreadId(String threadId) {
- synchronized (POOLS) {
- if (POOLS.containsKey(threadId)) {
- POOLS.get(threadId).shutdown();
- POOLS.remove(threadId);
- }
- }
- }
-
- // TODO: these are unused; remove? use monero-java awaitTasks() when updated
-
- public static Future> awaitTask(Runnable task) {
- return awaitTasks(Arrays.asList(task)).get(0);
- }
-
- public static List> awaitTasks(Collection tasks) {
- return awaitTasks(tasks, tasks.size());
- }
-
- public static List> awaitTasks(Collection tasks, int maxConcurrency) {
- return awaitTasks(tasks, maxConcurrency, null);
- }
-
- public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) {
- List> futures = new ArrayList<>();
- if (tasks.isEmpty()) return futures;
- ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency);
- for (Runnable task : tasks) futures.add(pool.submit(task));
- pool.shutdown();
-
- // interrupt after timeout
- if (timeoutSeconds != null) {
- try {
- if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow();
- } catch (InterruptedException e) {
- pool.shutdownNow();
- throw new RuntimeException(e);
- }
- }
-
- // throw exception from any tasks
- try {
- for (Future> future : futures) future.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return futures;
- }
-
public static String toCamelCase(String underscore) {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, underscore);
}
diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java
index 34991345..5896b5d7 100644
--- a/core/src/main/java/haveno/core/trade/Trade.java
+++ b/core/src/main/java/haveno/core/trade/Trade.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import common.utils.GenUtils;
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.crypto.Encryption;
@@ -116,6 +117,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public abstract class Trade implements Tradable, Model {
private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_";
+ private static final long SHUTDOWN_TIMEOUT_MS = 90000;
private final Object walletLock = new Object();
private final Object pollLock = new Object();
private MoneroWallet wallet;
@@ -586,7 +588,7 @@ public abstract class Trade implements Tradable, Model {
///////////////////////////////////////////////////////////////////////////////////////////
public void initialize(ProcessModelServiceProvider serviceProvider) {
- synchronized (this) {
+ ThreadUtils.await(() -> {
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
// set arbitrator pub key ring once known
@@ -596,8 +598,8 @@ public abstract class Trade implements Tradable, Model {
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
- HavenoUtils.submitToPool(() -> {
- HavenoUtils.submitToThread(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
+ ThreadUtils.submitToPool(() -> { // TODO: remove this?
+ ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
});
@@ -621,7 +623,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade state events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod();
startPolling();
@@ -631,7 +633,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) {
UserThread.execute(() -> {
@@ -646,7 +648,7 @@ public abstract class Trade implements Tradable, Model {
// handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();
// handle when payout published
@@ -677,7 +679,7 @@ public abstract class Trade implements Tradable, Model {
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
deleteWallet();
maybeClearProcessData();
if (idlePayoutSyncer != null) {
@@ -722,7 +724,7 @@ public abstract class Trade implements Tradable, Model {
// initialize syncing and polling
initSyncing();
- }
+ }, getId());
}
public void requestPersistence() {
@@ -787,8 +789,8 @@ public abstract class Trade implements Tradable, Model {
// check wallet connection on same thread as connection change
CountDownLatch latch = new CountDownLatch(1);
- HavenoUtils.submitToPool((() -> {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.submitToPool((() -> {
+ ThreadUtils.execute(() -> {
if (!isWalletConnectedToDaemon()) throw new RuntimeException("Trade wallet is not connected to a Monero node"); // wallet connection is updated on trade thread
latch.countDown();
}, getConnectionChangedThreadId());
@@ -1222,36 +1224,47 @@ public abstract class Trade implements Tradable, Model {
}
public void onShutDownStarted() {
+ if (wallet != null) log.info("Preparing to shut down {} {}", getClass().getSimpleName(), getId());
isShutDownStarted = true;
- if (wallet != null) log.info("{} {} preparing for shut down", getClass().getSimpleName(), getId());
stopPolling();
-
- // repeatedly acquire trade lock to allow other threads to finish
- for (int i = 0; i < 20; i++) {
- synchronized (this) {
- synchronized (walletLock) {
- if (isShutDown) break;
- }
- }
- }
}
public void shutDown() {
- if (!isPayoutUnlocked()) log.info("{} {} shutting down", getClass().getSimpleName(), getId());
- synchronized (this) {
- isInitialized = false;
- isShutDown = true;
- synchronized (walletLock) {
- if (wallet != null) {
- xmrWalletService.saveWallet(wallet, false); // skip backup
- stopWallet();
- }
+ if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
+
+ // shut down thread pools with timeout
+ List tasks = new ArrayList<>();
+ tasks.add(() -> ThreadUtils.shutDown(getId(), SHUTDOWN_TIMEOUT_MS));
+ tasks.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId(), SHUTDOWN_TIMEOUT_MS));
+ try {
+ ThreadUtils.awaitTasks(tasks);
+ } catch (Exception e) {
+ log.warn("Timeout shutting down {} {}", getClass().getSimpleName(), getId());
+
+ // force stop wallet
+ if (wallet != null) {
+ log.warn("Force stopping wallet for {} {}", getClass().getSimpleName(), getId());
+ xmrWalletService.stopWallet(wallet, wallet.getPath(), true);
+ wallet = null;
}
+ }
+
+ // de-initialize
+ isInitialized = false;
+ isShutDown = true;
+ if (idlePayoutSyncer != null) {
+ xmrWalletService.removeWalletListener(idlePayoutSyncer);
+ idlePayoutSyncer = null;
+ }
+ if (wallet != null) {
+ xmrWalletService.saveWallet(wallet, false); // skip backup
+ stopWallet();
+ }
+ UserThread.execute(() -> {
if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe();
if (tradePhaseSubscription != null) tradePhaseSubscription.unsubscribe();
if (payoutStateSubscription != null) payoutStateSubscription.unsubscribe();
- idlePayoutSyncer = null; // main wallet removes listener itself
- }
+ });
}
///////////////////////////////////////////////////////////////////////////////////////////
@@ -1262,11 +1275,6 @@ public abstract class Trade implements Tradable, Model {
public void onComplete() {
}
- public void onRemoved() {
- HavenoUtils.removeThreadId(getId());
- HavenoUtils.removeThreadId(getConnectionChangedThreadId());
- }
-
///////////////////////////////////////////////////////////////////////////////////////////
// Abstract
@@ -1864,7 +1872,7 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
- HavenoUtils.submitToPool(() -> initSyncing());
+ ThreadUtils.execute(() -> initSyncing(), getId());
}
}
}
@@ -1875,11 +1883,9 @@ public abstract class Trade implements Tradable, Model {
initSyncingAux();
} else {
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getWalletRefreshPeriod()); // random time to start syncing
- UserThread.runAfter(() -> {
- HavenoUtils.submitToPool(() -> {
- if (!isShutDownStarted) initSyncingAux();
- });
- }, startSyncingInMs / 1000l);
+ UserThread.runAfter(() -> ThreadUtils.execute(() -> {
+ if (!isShutDownStarted) initSyncingAux();
+ }, getId()), startSyncingInMs / 1000l);
}
}
@@ -2108,13 +2114,11 @@ public abstract class Trade implements Tradable, Model {
@Override
public void onNewBlock(long height) {
- HavenoUtils.submitToThread(() -> { // allow rapid notifications
+ ThreadUtils.execute(() -> { // allow rapid notifications
// skip rapid succession blocks
- synchronized (this) {
- if (processing) return;
- processing = true;
- }
+ if (processing) return;
+ processing = true;
// skip if not idling and not waiting for payout to unlock
if (!isIdling() || !isPayoutPublished() || isPayoutUnlocked()) {
diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java
index 0a7e4134..0c722b92 100644
--- a/core/src/main/java/haveno/core/trade/TradeManager.java
+++ b/core/src/main/java/haveno/core/trade/TradeManager.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
import common.utils.GenUtils;
import haveno.common.ClockWatcher;
+import haveno.common.ThreadUtils;
import haveno.common.crypto.KeyRing;
import haveno.common.crypto.PubKeyRing;
import haveno.common.handlers.ErrorMessageHandler;
@@ -234,7 +235,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (!(networkEnvelope instanceof TradeMessage)) return;
String tradeId = ((TradeMessage) networkEnvelope).getTradeId();
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
if (networkEnvelope instanceof InitTradeRequest) {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
} else if (networkEnvelope instanceof InitMultisigRequest) {
@@ -315,10 +316,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
- HavenoUtils.awaitTasks(tasks);
+ ThreadUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error notifying trades that shut down started: {}", e.getMessage());
- e.printStackTrace();
+ throw e;
}
}
@@ -345,7 +346,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
- HavenoUtils.awaitTasks(tasks);
+ ThreadUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error shutting down trades: {}", e.getMessage());
e.printStackTrace();
@@ -413,7 +414,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
};
- HavenoUtils.awaitTasks(tasks, threadPoolSize);
+ ThreadUtils.awaitTasks(tasks, threadPoolSize);
log.info("Done initializing persisted trades");
if (isShutDownStarted) return;
@@ -422,7 +423,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// sync idle trades once in background after active trades
for (Trade trade : trades) {
- if (trade.isIdling()) HavenoUtils.submitToPool(() -> trade.syncAndPollWallet());
+ if (trade.isIdling()) ThreadUtils.submitToPool(() -> trade.syncAndPollWallet());
}
// process after all wallets initialized
@@ -1224,7 +1225,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade
tradableList.remove(trade);
- trade.onRemoved();
// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java
index f6a6e351..01017aff 100644
--- a/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/BuyerAsMakerProtocol.java
@@ -17,6 +17,7 @@ e * This file is part of Haveno.
package haveno.core.trade.protocol;
+import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.BuyerAsMakerTrade;
import haveno.core.trade.Trade;
@@ -43,7 +44,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
- new Thread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
@@ -66,6 +67,6 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
.executeTasks(true);
awaitTradeLatch();
}
- }).start();
+ }, trade.getId());
}
}
diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java
index 962fc6fb..7baacee2 100644
--- a/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/BuyerAsTakerProtocol.java
@@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
+import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.BuyerAsTakerTrade;
import haveno.core.trade.Trade;
@@ -47,7 +48,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
- new Thread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
@@ -71,6 +72,6 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
.executeTasks(true);
awaitTradeLatch();
}
- }).start();
+ }, trade.getId());
}
}
diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java
index 6ebd8021..c31d6453 100644
--- a/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/SellerAsMakerProtocol.java
@@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
+import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.SellerAsMakerTrade;
import haveno.core.trade.Trade;
@@ -48,7 +49,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
- new Thread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
@@ -71,6 +72,6 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
.executeTasks(true);
awaitTradeLatch();
}
- }).start();
+ }, trade.getId());
}
}
diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java
index 98bfa7e6..a9898301 100644
--- a/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/SellerAsTakerProtocol.java
@@ -18,6 +18,7 @@
package haveno.core.trade.protocol;
+import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler;
import haveno.core.trade.SellerAsTakerTrade;
import haveno.core.trade.Trade;
@@ -48,7 +49,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getSimpleName() + ".onTakeOffer()");
- new Thread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
@@ -72,6 +73,6 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
.executeTasks(true);
awaitTradeLatch();
}
- }).start();
+ }, trade.getId());
}
}
diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
index d9ca7a0a..0fc2b18f 100644
--- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
@@ -17,6 +17,7 @@
package haveno.core.trade.protocol;
+import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
@@ -113,12 +114,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
- HavenoUtils.submitToThread(() -> handle(message, peerNodeAddress), trade.getId());
+ ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId());
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
- handle(message, peerNodeAddress);
+ ThreadUtils.execute(() -> handle(message, peerNodeAddress), trade.getId());
}
private void handle(TradeMessage message, NodeAddress peerNodeAddress) {
@@ -264,7 +265,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void maybeSendDepositsConfirmedMessages() {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return;
synchronized (trade) {
if (!trade.isInitialized() || trade.isShutDownStarted()) return; // skip if shutting down
@@ -285,7 +286,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (trade) {
// skip if no need to reprocess
@@ -296,190 +297,198 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}
-
- handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
}, trade.getId());
}
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()");
- synchronized (trade) {
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
- // check trade
- if (trade.hasFailed()) {
- log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), request.getClass().getSimpleName(), sender, trade.getErrorMessage());
- return;
+ // check trade
+ if (trade.hasFailed()) {
+ log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), request.getClass().getSimpleName(), sender, trade.getErrorMessage());
+ return;
+ }
+ Validator.checkTradeId(processModel.getOfferId(), request);
+
+ // process message
+ latchTrade();
+ processModel.setTradeMessage(request);
+ expect(anyPhase(Trade.Phase.INIT)
+ .with(request)
+ .from(sender))
+ .setup(tasks(
+ ProcessInitMultisigRequest.class,
+ MaybeSendSignContractRequest.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ startTimeout(TRADE_TIMEOUT);
+ handleTaskRunnerSuccess(sender, request);
+ },
+ errorMessage -> {
+ handleTaskRunnerFault(sender, request, errorMessage);
+ }))
+ .withTimeout(TRADE_TIMEOUT))
+ .executeTasks(true);
+ awaitTradeLatch();
}
- Validator.checkTradeId(processModel.getOfferId(), request);
-
- // proocess message
- latchTrade();
- processModel.setTradeMessage(request);
- expect(anyPhase(Trade.Phase.INIT)
- .with(request)
- .from(sender))
- .setup(tasks(
- ProcessInitMultisigRequest.class,
- MaybeSendSignContractRequest.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- startTimeout(TRADE_TIMEOUT);
- handleTaskRunnerSuccess(sender, request);
- },
- errorMessage -> {
- handleTaskRunnerFault(sender, request, errorMessage);
- }))
- .withTimeout(TRADE_TIMEOUT))
- .executeTasks(true);
- awaitTradeLatch();
- }
+ }, trade.getId());
}
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId());
- synchronized (trade) {
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
- // check trade
- if (trade.hasFailed()) {
- log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage());
- return;
- }
- Validator.checkTradeId(processModel.getOfferId(), message);
-
- // process message
- if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
- latchTrade();
+ // check trade
+ if (trade.hasFailed()) {
+ log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage());
+ return;
+ }
Validator.checkTradeId(processModel.getOfferId(), message);
- processModel.setTradeMessage(message);
- expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED)
- .with(message)
- .from(sender))
- .setup(tasks(
- // TODO (woodser): validate request
- ProcessSignContractRequest.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- startTimeout(TRADE_TIMEOUT);
- handleTaskRunnerSuccess(sender, message);
- },
- errorMessage -> {
- handleTaskRunnerFault(sender, message, errorMessage);
- }))
- .withTimeout(TRADE_TIMEOUT)) // extend timeout
- .executeTasks(true);
- awaitTradeLatch();
- } else {
-
- // process sign contract request after multisig created
- EasyBind.subscribe(trade.stateProperty(), state -> {
- if (state == Trade.State.MULTISIG_COMPLETED) HavenoUtils.submitToThread(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock
- });
+
+ // process message
+ if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
+ latchTrade();
+ Validator.checkTradeId(processModel.getOfferId(), message);
+ processModel.setTradeMessage(message);
+ expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED)
+ .with(message)
+ .from(sender))
+ .setup(tasks(
+ // TODO (woodser): validate request
+ ProcessSignContractRequest.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ startTimeout(TRADE_TIMEOUT);
+ handleTaskRunnerSuccess(sender, message);
+ },
+ errorMessage -> {
+ handleTaskRunnerFault(sender, message, errorMessage);
+ }))
+ .withTimeout(TRADE_TIMEOUT)) // extend timeout
+ .executeTasks(true);
+ awaitTradeLatch();
+ } else {
+
+ // process sign contract request after multisig created
+ EasyBind.subscribe(trade.stateProperty(), state -> {
+ if (state == Trade.State.MULTISIG_COMPLETED) ThreadUtils.execute(() -> handleSignContractRequest(message, sender), trade.getId()); // process notification without trade lock
+ });
+ }
}
- }
+ }, trade.getId());
}
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId());
- synchronized (trade) {
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
- // check trade
- if (trade.hasFailed()) {
- log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage());
- return;
- }
- Validator.checkTradeId(processModel.getOfferId(), message);
-
- // process message
- if (trade.getState() == Trade.State.CONTRACT_SIGNED) {
- latchTrade();
+ // check trade
+ if (trade.hasFailed()) {
+ log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), message.getClass().getSimpleName(), sender, trade.getErrorMessage());
+ return;
+ }
Validator.checkTradeId(processModel.getOfferId(), message);
- processModel.setTradeMessage(message);
- expect(state(Trade.State.CONTRACT_SIGNED)
- .with(message)
- .from(sender))
- .setup(tasks(
- // TODO (woodser): validate request
- ProcessSignContractResponse.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- startTimeout(TRADE_TIMEOUT);
- handleTaskRunnerSuccess(sender, message);
- },
- errorMessage -> {
- handleTaskRunnerFault(sender, message, errorMessage);
- }))
- .withTimeout(TRADE_TIMEOUT)) // extend timeout
- .executeTasks(true);
- awaitTradeLatch();
- } else {
-
- // process sign contract response after contract signed
- EasyBind.subscribe(trade.stateProperty(), state -> {
- if (state == Trade.State.CONTRACT_SIGNED) HavenoUtils.submitToThread(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock
- });
+
+ // process message
+ if (trade.getState() == Trade.State.CONTRACT_SIGNED) {
+ latchTrade();
+ Validator.checkTradeId(processModel.getOfferId(), message);
+ processModel.setTradeMessage(message);
+ expect(state(Trade.State.CONTRACT_SIGNED)
+ .with(message)
+ .from(sender))
+ .setup(tasks(
+ // TODO (woodser): validate request
+ ProcessSignContractResponse.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ startTimeout(TRADE_TIMEOUT);
+ handleTaskRunnerSuccess(sender, message);
+ },
+ errorMessage -> {
+ handleTaskRunnerFault(sender, message, errorMessage);
+ }))
+ .withTimeout(TRADE_TIMEOUT)) // extend timeout
+ .executeTasks(true);
+ awaitTradeLatch();
+ } else {
+
+ // process sign contract response after contract signed
+ EasyBind.subscribe(trade.stateProperty(), state -> {
+ if (state == Trade.State.CONTRACT_SIGNED) ThreadUtils.execute(() -> handleSignContractResponse(message, sender), trade.getId()); // process notification without trade lock
+ });
+ }
}
- }
+ }, trade.getId());
}
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleDepositResponse()");
- synchronized (trade) {
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
- // check trade
- if (trade.hasFailed()) {
- log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), response.getClass().getSimpleName(), sender, trade.getErrorMessage());
- return;
+ // check trade
+ if (trade.hasFailed()) {
+ log.warn("{} {} ignoring {} from {} because trade failed with previous error: {}", trade.getClass().getSimpleName(), trade.getId(), response.getClass().getSimpleName(), sender, trade.getErrorMessage());
+ return;
+ }
+ Validator.checkTradeId(processModel.getOfferId(), response);
+
+ // process message
+ latchTrade();
+ processModel.setTradeMessage(response);
+ expect(anyState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK)
+ .with(response)
+ .from(sender))
+ .setup(tasks(
+ ProcessDepositResponse.class,
+ RemoveOffer.class,
+ SellerPublishTradeStatistics.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ stopTimeout();
+ this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED
+ handleTaskRunnerSuccess(sender, response);
+ if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized
+ },
+ errorMessage -> {
+ handleTaskRunnerFault(sender, response, errorMessage);
+ }))
+ .withTimeout(TRADE_TIMEOUT))
+ .executeTasks(true);
+ awaitTradeLatch();
}
- Validator.checkTradeId(processModel.getOfferId(), response);
-
- // process message
- latchTrade();
- processModel.setTradeMessage(response);
- expect(anyState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK)
- .with(response)
- .from(sender))
- .setup(tasks(
- ProcessDepositResponse.class,
- RemoveOffer.class,
- SellerPublishTradeStatistics.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- stopTimeout();
- this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED
- handleTaskRunnerSuccess(sender, response);
- if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized
- },
- errorMessage -> {
- handleTaskRunnerFault(sender, response, errorMessage);
- }))
- .withTimeout(TRADE_TIMEOUT))
- .executeTasks(true);
- awaitTradeLatch();
- }
+ }, trade.getId());
}
public void handle(DepositsConfirmedMessage response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)");
- synchronized (trade) {
- latchTrade();
- this.errorMessageHandler = null;
- expect(new Condition(trade)
- .with(response)
- .from(sender))
- .setup(tasks(
- ProcessDepositsConfirmedMessage.class,
- VerifyPeersAccountAgeWitness.class,
- MaybeResendDisputeClosedMessageWithPayout.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- handleTaskRunnerSuccess(sender, response);
- },
- errorMessage -> {
- handleTaskRunnerFault(sender, response, errorMessage);
- })))
- .executeTasks();
- awaitTradeLatch();
- }
+ ThreadUtils.execute(() -> {
+ synchronized (trade) {
+ latchTrade();
+ this.errorMessageHandler = null;
+ expect(new Condition(trade)
+ .with(response)
+ .from(sender))
+ .setup(tasks(
+ ProcessDepositsConfirmedMessage.class,
+ VerifyPeersAccountAgeWitness.class,
+ MaybeResendDisputeClosedMessageWithPayout.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ handleTaskRunnerSuccess(sender, response);
+ },
+ errorMessage -> {
+ handleTaskRunnerFault(sender, response, errorMessage);
+ })))
+ .executeTasks();
+ awaitTradeLatch();
+ }
+ }, trade.getId());
}
// received by seller and arbitrator
@@ -489,43 +498,45 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Ignoring PaymentSentMessage since not seller or arbitrator");
return;
}
- // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
- // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
- // a mailbox message with PaymentSentMessage.
- // TODO A better fix would be to add a listener for the wallet sync state and process
- // the mailbox msg once wallet is ready and trade state set.
- synchronized (trade) {
- if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
- log.warn("Received another PaymentSentMessage which was already processed, ACKing");
- handleTaskRunnerSuccess(peer, message);
- return;
+ ThreadUtils.execute(() -> {
+ // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
+ // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
+ // a mailbox message with PaymentSentMessage.
+ // TODO A better fix would be to add a listener for the wallet sync state and process
+ // the mailbox msg once wallet is ready and trade state set.
+ synchronized (trade) {
+ if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
+ log.warn("Received another PaymentSentMessage which was already processed, ACKing");
+ handleTaskRunnerSuccess(peer, message);
+ return;
+ }
+ if (trade.getPayoutTx() != null) {
+ log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
+ "so we ignore the message. This can happen if the ACK message to the peer did not " +
+ "arrive and the peer repeats sending us the message. We send another ACK msg.");
+ sendAckMessage(peer, message, true, null);
+ removeMailboxMessageAfterProcessing(message);
+ return;
+ }
+ latchTrade();
+ expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
+ .with(message)
+ .from(peer))
+ .setup(tasks(
+ ApplyFilter.class,
+ ProcessPaymentSentMessage.class,
+ VerifyPeersAccountAgeWitness.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ handleTaskRunnerSuccess(peer, message);
+ },
+ (errorMessage) -> {
+ handleTaskRunnerFault(peer, message, errorMessage);
+ })))
+ .executeTasks(true);
+ awaitTradeLatch();
}
- if (trade.getPayoutTx() != null) {
- log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
- "so we ignore the message. This can happen if the ACK message to the peer did not " +
- "arrive and the peer repeats sending us the message. We send another ACK msg.");
- sendAckMessage(peer, message, true, null);
- removeMailboxMessageAfterProcessing(message);
- return;
- }
- latchTrade();
- expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
- .with(message)
- .from(peer))
- .setup(tasks(
- ApplyFilter.class,
- ProcessPaymentSentMessage.class,
- VerifyPeersAccountAgeWitness.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- handleTaskRunnerSuccess(peer, message);
- },
- (errorMessage) -> {
- handleTaskRunnerFault(peer, message, errorMessage);
- })))
- .executeTasks(true);
- awaitTradeLatch();
- }
+ }, trade.getId());
}
// received by buyer and arbitrator
@@ -535,56 +546,58 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
private void handle(PaymentReceivedMessage message, NodeAddress peer, boolean reprocessOnError) {
System.out.println(getClass().getSimpleName() + ".handle(PaymentReceivedMessage)");
- if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) {
- log.warn("Ignoring PaymentReceivedMessage since not buyer or arbitrator");
- return;
- }
- synchronized (trade) {
- latchTrade();
- Validator.checkTradeId(processModel.getOfferId(), message);
- processModel.setTradeMessage(message);
-
- // check minimum trade phase
- if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) {
- log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
- return;
- }
- if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) {
- log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
- return;
- }
- if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) {
- log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
+ ThreadUtils.execute(() -> {
+ if (!(trade instanceof BuyerTrade || trade instanceof ArbitratorTrade)) {
+ log.warn("Ignoring PaymentReceivedMessage since not buyer or arbitrator");
return;
}
+ synchronized (trade) {
+ latchTrade();
+ Validator.checkTradeId(processModel.getOfferId(), message);
+ processModel.setTradeMessage(message);
- expect(anyPhase()
- .with(message)
- .from(peer))
- .setup(tasks(
- ProcessPaymentReceivedMessage.class)
- .using(new TradeTaskRunner(trade,
- () -> {
- handleTaskRunnerSuccess(peer, message);
- },
- errorMessage -> {
- log.warn("Error processing payment received message: " + errorMessage);
- processModel.getTradeManager().requestPersistence();
+ // check minimum trade phase
+ if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) {
+ log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
+ return;
+ }
+ if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) {
+ log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
+ return;
+ }
+ if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) {
+ log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
+ return;
+ }
- // schedule to reprocess message unless deleted
- if (trade.getSeller().getPaymentReceivedMessage() != null) {
- UserThread.runAfter(() -> {
- reprocessPaymentReceivedMessageCount++;
- maybeReprocessPaymentReceivedMessage(reprocessOnError);
- }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount));
- } else {
- handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
- }
- unlatchTrade();
- })))
- .executeTasks(true);
- awaitTradeLatch();
- }
+ expect(anyPhase()
+ .with(message)
+ .from(peer))
+ .setup(tasks(
+ ProcessPaymentReceivedMessage.class)
+ .using(new TradeTaskRunner(trade,
+ () -> {
+ handleTaskRunnerSuccess(peer, message);
+ },
+ errorMessage -> {
+ log.warn("Error processing payment received message: " + errorMessage);
+ processModel.getTradeManager().requestPersistence();
+
+ // schedule to reprocess message unless deleted
+ if (trade.getSeller().getPaymentReceivedMessage() != null) {
+ UserThread.runAfter(() -> {
+ reprocessPaymentReceivedMessageCount++;
+ maybeReprocessPaymentReceivedMessage(reprocessOnError);
+ }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount));
+ } else {
+ handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
+ }
+ unlatchTrade();
+ })))
+ .executeTasks(true);
+ awaitTradeLatch();
+ }
+ }, trade.getId());
}
public void onWithdrawCompleted() {
diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java
index f68432f5..e75fdb0e 100644
--- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java
+++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java
@@ -4,6 +4,7 @@ import com.google.common.util.concurrent.Service.State;
import com.google.inject.name.Named;
import common.utils.JsonUtils;
+import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.file.FileUtil;
@@ -707,7 +708,11 @@ public class XmrWalletService {
public void onShutDownStarted() {
log.info("XmrWalletService.onShutDownStarted()");
this.isShutDownStarted = true;
+ }
+ public void shutDown() {
+ log.info("Shutting down {}", getClass().getSimpleName());
+
// remove listeners which stops polling wallet
// TODO monero-java: wallet.stopPolling()?
synchronized (walletLock) {
@@ -717,15 +722,11 @@ public class XmrWalletService {
}
}
}
- }
-
- public void shutDown() {
- log.info("Shutting down {}", getClass().getSimpleName());
// shut down trade and main wallets at same time
walletListeners.clear();
closeMainWallet(true);
- log.info("Done shutting down all wallets");
+ log.info("Done shutting down main wallet");
}
// ------------------------------ PRIVATE HELPERS -------------------------
@@ -734,7 +735,7 @@ public class XmrWalletService {
// listen for connection changes
xmrConnectionService.addConnectionListener(connection -> {
- HavenoUtils.submitToThread(() -> onConnectionChanged(connection), THREAD_ID);
+ ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID);
});
// initialize main wallet when daemon synced
@@ -744,7 +745,7 @@ public class XmrWalletService {
}
private void initMainWalletIfConnected() {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
synchronized (walletLock) {
if (xmrConnectionService.downloadPercentageProperty().get() == 1 && wallet == null && !isShutDownStarted) {
maybeInitMainWallet(true);
@@ -817,12 +818,12 @@ public class XmrWalletService {
// reschedule to init main wallet
UserThread.runAfter(() -> {
- HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
+ ThreadUtils.execute(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
} else {
log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000);
UserThread.runAfter(() -> {
- HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
+ ThreadUtils.execute(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
}
}
@@ -994,7 +995,7 @@ public class XmrWalletService {
// sync wallet on new thread
if (connection != null) {
wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE);
- HavenoUtils.submitToPool(() -> {
+ ThreadUtils.submitToPool(() -> {
if (isShutDownStarted) return;
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs());
try {
@@ -1034,7 +1035,7 @@ public class XmrWalletService {
}
// excute tasks in parallel
- HavenoUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size()));
+ ThreadUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size()));
log.info("Done changing all wallet passwords");
}
@@ -1318,7 +1319,7 @@ public class XmrWalletService {
BigInteger balance;
if (balanceListener.getSubaddressIndex() != null && balanceListener.getSubaddressIndex() != 0) balance = getBalanceForSubaddress(balanceListener.getSubaddressIndex());
else balance = getAvailableBalance();
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
try {
balanceListener.onBalanceChanged(balance);
} catch (Exception e) {
@@ -1369,14 +1370,14 @@ public class XmrWalletService {
@Override
public void onSyncProgress(long height, long startHeight, long endHeight, double percentDone, String message) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message);
}, THREAD_ID);
}
@Override
public void onNewBlock(long height) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
walletHeight.set(height);
for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height);
}, THREAD_ID);
@@ -1384,7 +1385,7 @@ public class XmrWalletService {
@Override
public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance);
updateBalanceListeners();
}, THREAD_ID);
@@ -1392,14 +1393,14 @@ public class XmrWalletService {
@Override
public void onOutputReceived(MoneroOutputWallet output) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output);
}, THREAD_ID);
}
@Override
public void onOutputSpent(MoneroOutputWallet output) {
- HavenoUtils.submitToThread(() -> {
+ ThreadUtils.execute(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output);
}, THREAD_ID);
}
diff --git a/p2p/src/main/java/haveno/network/p2p/network/Connection.java b/p2p/src/main/java/haveno/network/p2p/network/Connection.java
index a154f34a..1eaa0e6a 100644
--- a/p2p/src/main/java/haveno/network/p2p/network/Connection.java
+++ b/p2p/src/main/java/haveno/network/p2p/network/Connection.java
@@ -32,6 +32,7 @@ import haveno.network.p2p.storage.payload.CapabilityRequiringPayload;
import haveno.network.p2p.storage.payload.PersistableNetworkPayload;
import haveno.common.Proto;
+import haveno.common.ThreadUtils;
import haveno.common.app.Capabilities;
import haveno.common.app.HasCapabilities;
import haveno.common.app.Version;
@@ -73,7 +74,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -109,7 +109,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240);
private static final int SHUTDOWN_TIMEOUT = 100;
- private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); // one shared thread to handle messages sequentially
+ private static final String THREAD_ID = Connection.class.getSimpleName();
public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
@@ -212,7 +212,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
reportInvalidRequest(RuleViolation.PEER_BANNED);
}
}
- EXECUTOR.execute(() -> connectionListener.onConnection(this));
+ ThreadUtils.execute(() -> connectionListener.onConnection(this), THREAD_ID);
} catch (Throwable e) {
handleException(e);
}
@@ -266,8 +266,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
- EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)));
- EXECUTOR.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize));
+ ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)), THREAD_ID);
+ ThreadUtils.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize), THREAD_ID);
}
} catch (Throwable t) {
handleException(t);
@@ -396,7 +396,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof BundleOfEnvelopes) {
onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection);
} else {
- EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
+ ThreadUtils.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)), THREAD_ID);
}
}
@@ -432,8 +432,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
envelopesToProcess.add(networkEnvelope);
}
}
- envelopesToProcess.forEach(envelope -> EXECUTOR.execute(() ->
- messageListeners.forEach(listener -> listener.onMessage(envelope, connection))));
+ envelopesToProcess.forEach(envelope -> ThreadUtils.execute(() -> {
+ messageListeners.forEach(listener -> listener.onMessage(envelope, connection));
+ }, THREAD_ID));
}
@@ -503,7 +504,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
t.printStackTrace();
} finally {
stopped = true;
- EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
+ ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
}
}, "Connection:SendCloseConnectionMessage-" + this.uid).start();
} else {
@@ -513,12 +514,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
} else {
//TODO find out why we get called that
log.debug("stopped was already at shutDown call");
- EXECUTOR.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
+ ThreadUtils.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler), THREAD_ID);
}
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
- EXECUTOR.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
+ ThreadUtils.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this), THREAD_ID);
try {
protoOutputStream.onConnectionShutdown();
socket.close();
@@ -541,7 +542,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.debug("Connection shutdown complete {}", this);
if (shutDownCompleteHandler != null)
- EXECUTOR.execute(shutDownCompleteHandler);
+ ThreadUtils.execute(shutDownCompleteHandler, THREAD_ID);
}
}
@@ -844,8 +845,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.info("We got a {} from a peer with yet unknown address on connection with uid={}", networkEnvelope.getClass().getSimpleName(), uid);
}
- EXECUTOR.execute(() -> onMessage(networkEnvelope, this));
- EXECUTOR.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size));
+ ThreadUtils.execute(() -> onMessage(networkEnvelope, this), THREAD_ID);
+ ThreadUtils.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size), THREAD_ID);
}
} catch (InvalidClassException e) {
log.error(e.getMessage());
@@ -894,7 +895,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
- EXECUTOR.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
+ ThreadUtils.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities), THREAD_ID);
}
});
return false;