From e6775f3b58cdeaf75cabad58f7c3b7daf0c6acbe Mon Sep 17 00:00:00 2001 From: woodser Date: Sun, 17 Dec 2023 09:38:30 -0500 Subject: [PATCH] move processing off UserThread for smoother experience --- .../main/java/haveno/common/UserThread.java | 14 ++ .../haveno/core/api/XmrConnectionService.java | 70 ++++++---- .../haveno/core/app/HavenoExecutable.java | 13 +- .../java/haveno/core/app/WalletAppSetup.java | 10 +- .../app/misc/ExecutableForAppWithP2p.java | 17 ++- .../haveno/core/offer/OfferFilterService.java | 9 +- .../haveno/core/offer/OpenOfferManager.java | 106 ++++++++-------- .../tasks/MakerSendSignOfferRequest.java | 2 +- .../arbitration/ArbitrationManager.java | 4 +- .../java/haveno/core/trade/HavenoUtils.java | 46 +++---- .../main/java/haveno/core/trade/Trade.java | 10 +- .../java/haveno/core/trade/TradeManager.java | 14 +- .../core/xmr/wallet/XmrWalletService.java | 120 ++++++++---------- .../main/funds/deposit/DepositListItem.java | 9 +- .../main/offer/takeoffer/TakeOfferView.java | 16 ++- .../network/p2p/network/Connection.java | 31 +++-- 16 files changed, 276 insertions(+), 215 deletions(-) diff --git a/common/src/main/java/haveno/common/UserThread.java b/common/src/main/java/haveno/common/UserThread.java index 51019ee1..7265b3f4 100644 --- a/common/src/main/java/haveno/common/UserThread.java +++ b/common/src/main/java/haveno/common/UserThread.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -59,6 +60,19 @@ public class UserThread { UserThread.executor.execute(command); } + public static void await(Runnable command) { + CountDownLatch latch = new CountDownLatch(1); + executor.execute(() -> { + command.run(); + latch.countDown(); + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // Prefer FxTimer if a delay is needed in a JavaFx class (gui module) public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) { return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS); diff --git a/core/src/main/java/haveno/core/api/XmrConnectionService.java b/core/src/main/java/haveno/core/api/XmrConnectionService.java index ae684745..fe0d83b9 100644 --- a/core/src/main/java/haveno/core/api/XmrConnectionService.java +++ b/core/src/main/java/haveno/core/api/XmrConnectionService.java @@ -20,6 +20,7 @@ import javafx.beans.property.LongProperty; import javafx.beans.property.ObjectProperty; import javafx.beans.property.ReadOnlyDoubleProperty; import javafx.beans.property.ReadOnlyIntegerProperty; +import javafx.beans.property.ReadOnlyLongProperty; import javafx.beans.property.ReadOnlyObjectProperty; import javafx.beans.property.SimpleIntegerProperty; import javafx.beans.property.SimpleLongProperty; @@ -61,9 +62,11 @@ public final class XmrConnectionService { private final MoneroConnectionManager connectionManager; private final EncryptedConnectionList connectionList; private final ObjectProperty> peers = new SimpleObjectProperty<>(); + private final ObjectProperty connectionProperty = new SimpleObjectProperty<>(); private final IntegerProperty numPeers = new SimpleIntegerProperty(0); private final LongProperty chainHeight = new SimpleLongProperty(0); private final DownloadListener downloadListener = new DownloadListener(); + private final LongProperty numUpdates = new SimpleLongProperty(0); private Socks5ProxyProvider socks5ProxyProvider; private boolean isInitialized; @@ -286,6 +289,10 @@ public final class XmrConnectionService { return peers; } + public ReadOnlyObjectProperty connectionProperty() { + return connectionProperty; + } + public boolean hasSufficientPeersForBroadcast() { return numPeers.get() >= getMinBroadcastConnections(); } @@ -306,6 +313,10 @@ public final class XmrConnectionService { return downloadPercentageProperty().get() == 1d; } + public ReadOnlyLongProperty numUpdatesProperty() { + return numUpdates; + } + // ------------------------------- HELPERS -------------------------------- private void doneDownload() { @@ -517,6 +528,12 @@ public final class XmrConnectionService { connectionList.addConnection(currentConnection); connectionList.setCurrentConnectionUri(currentConnection.getUri()); } + + // set connection property on user thread + UserThread.execute(() -> { + connectionProperty.set(currentConnection); + numUpdates.set(numUpdates.get() + 1); + }); } updatePolling(); @@ -564,31 +581,38 @@ public final class XmrConnectionService { if (daemon == null) throw new RuntimeException("No daemon connection"); lastInfo = daemon.getInfo(); - // set chain height - chainHeight.set(lastInfo.getHeight()); + // update properties on user thread + UserThread.execute(() -> { - // update sync progress - boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL; - if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections" - else if (lastInfo.isBusySyncing()) { - long targetHeight = lastInfo.getTargetHeight(); - long blocksLeft = targetHeight - lastInfo.getHeight(); - if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight(); - double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress - downloadListener.progress(percent, blocksLeft, null); - } + // set chain height + chainHeight.set(lastInfo.getHeight()); + + // update sync progress + boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL; + if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections" + else if (lastInfo.isBusySyncing()) { + long targetHeight = lastInfo.getTargetHeight(); + long blocksLeft = targetHeight - lastInfo.getHeight(); + if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight(); + double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress + downloadListener.progress(percent, blocksLeft, null); + } + + // set peer connections + // TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections + // try { + // peers.set(getOnlinePeers()); + // } catch (Exception err) { + // // TODO: peers unknown due to restricted RPC call + // } + // numPeers.set(peers.get().size()); + numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections()); + peers.set(new ArrayList()); + + // notify update + numUpdates.set(numUpdates.get() + 1); + }); - // set peer connections - // TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections - // try { - // peers.set(getOnlinePeers()); - // } catch (Exception err) { - // // TODO: peers unknown due to restricted RPC call - // } - // numPeers.set(peers.get().size()); - numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections()); - peers.set(new ArrayList()); - // handle error recovery if (lastErrorTimestamp != null) { log.info("Successfully fetched daemon info after previous error"); diff --git a/core/src/main/java/haveno/core/app/HavenoExecutable.java b/core/src/main/java/haveno/core/app/HavenoExecutable.java index eb155bd6..4d222985 100644 --- a/core/src/main/java/haveno/core/app/HavenoExecutable.java +++ b/core/src/main/java/haveno/core/app/HavenoExecutable.java @@ -42,11 +42,13 @@ import haveno.core.setup.CorePersistedDataHost; import haveno.core.setup.CoreSetup; import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager; import haveno.core.trade.HavenoUtils; +import haveno.core.trade.TradeManager; import haveno.core.trade.statistics.TradeStatisticsManager; import haveno.core.xmr.setup.WalletsSetup; import haveno.core.xmr.wallet.BtcWalletService; import haveno.core.xmr.wallet.XmrWalletService; import haveno.network.p2p.P2PService; +import haveno.network.p2p.network.Connection; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -337,7 +339,12 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven Set tasks = new HashSet(); tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); - HavenoUtils.executeTasks(tasks); // notify in parallel + tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); + try { + HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + } catch (Exception e) { + e.printStackTrace(); + } injector.getInstance(PriceFeedService.class).shutDown(); injector.getInstance(ArbitratorManager.class).shutDown(); @@ -357,6 +364,10 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven // shut down monero wallets and connections injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { + log.info("Shutting down connections"); + Connection.shutDownExecutor(30); + + // done shutting down log.info("Graceful shutdown completed. Exiting now."); module.close(injector); completeShutdown(resultHandler, EXIT_SUCCESS, systemExit); diff --git a/core/src/main/java/haveno/core/app/WalletAppSetup.java b/core/src/main/java/haveno/core/app/WalletAppSetup.java index 812933a9..fe39e1dd 100644 --- a/core/src/main/java/haveno/core/app/WalletAppSetup.java +++ b/core/src/main/java/haveno/core/app/WalletAppSetup.java @@ -109,13 +109,13 @@ public class WalletAppSetup { log.info("Initialize WalletAppSetup with monero-java version {}", MoneroUtils.getVersion()); ObjectProperty walletServiceException = new SimpleObjectProperty<>(); - xmrInfoBinding = EasyBind.combine(xmrConnectionService.downloadPercentageProperty(), - xmrConnectionService.chainHeightProperty(), + xmrInfoBinding = EasyBind.combine( + xmrConnectionService.numUpdatesProperty(), // receives notification of any connection update xmrWalletService.downloadPercentageProperty(), xmrWalletService.walletHeightProperty(), walletServiceException, getWalletServiceErrorMsg(), - (chainDownloadPercentage, chainHeight, walletDownloadPercentage, walletHeight, exception, errorMsg) -> { + (numConnectionUpdates, walletDownloadPercentage, walletHeight, exception, errorMsg) -> { String result; if (exception == null && errorMsg == null) { @@ -137,9 +137,9 @@ public class WalletAppSetup { } else { // update daemon sync progress - double chainDownloadPercentageD = (double) chainDownloadPercentage; + double chainDownloadPercentageD = xmrConnectionService.downloadPercentageProperty().doubleValue(); xmrDaemonSyncProgress.set(chainDownloadPercentageD); - Long bestChainHeight = chainHeight == null ? null : (Long) chainHeight; + Long bestChainHeight = xmrConnectionService.chainHeightProperty().get(); String chainHeightAsString = bestChainHeight != null && bestChainHeight > 0 ? String.valueOf(bestChainHeight) : ""; if (chainDownloadPercentageD == 1) { String synchronizedWith = Res.get("mainView.footer.xmrInfo.connectedTo", getXmrDaemonNetworkAsString(), chainHeightAsString); diff --git a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java index 793717ed..0b8c25d5 100644 --- a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java +++ b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java @@ -32,11 +32,13 @@ import haveno.core.offer.OfferBookService; import haveno.core.offer.OpenOfferManager; import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager; import haveno.core.trade.HavenoUtils; +import haveno.core.trade.TradeManager; import haveno.core.xmr.setup.WalletsSetup; import haveno.core.xmr.wallet.BtcWalletService; import haveno.core.xmr.wallet.XmrWalletService; import haveno.network.p2p.NodeAddress; import haveno.network.p2p.P2PService; +import haveno.network.p2p.network.Connection; import haveno.network.p2p.seed.SeedNodeRepository; import lombok.extern.slf4j.Slf4j; @@ -93,11 +95,16 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable { try { if (injector != null) { - // notify trade protocols and wallets to prepare for shut down before shutting down + // notify trade protocols and wallets to prepare for shut down Set tasks = new HashSet(); tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); - HavenoUtils.executeTasks(tasks); // notify in parallel + tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); + try { + HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + } catch (Exception e) { + e.printStackTrace(); + } JsonFileManager.shutDownAllInstances(); injector.getInstance(ArbitratorManager.class).shutDown(); @@ -117,8 +124,12 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable { injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { module.close(injector); PersistenceManager.flushAllDataToDiskAtShutdown(() -> { - resultHandler.handleResult(); + log.info("Shutting down connections"); + Connection.shutDownExecutor(30); + + // done shutting down log.info("Graceful shutdown completed. Exiting now."); + resultHandler.handleResult(); UserThread.runAfter(() -> System.exit(HavenoExecutable.EXIT_SUCCESS), 1); }); }); diff --git a/core/src/main/java/haveno/core/offer/OfferFilterService.java b/core/src/main/java/haveno/core/offer/OfferFilterService.java index f9816582..67fb2fbc 100644 --- a/core/src/main/java/haveno/core/offer/OfferFilterService.java +++ b/core/src/main/java/haveno/core/offer/OfferFilterService.java @@ -229,11 +229,12 @@ public class OfferFilterService { Arbitrator thisArbitrator = user.getRegisteredArbitrator(); if (thisArbitrator != null && thisArbitrator.getNodeAddress().equals(offer.getOfferPayload().getArbitratorSigner())) { if (thisArbitrator.getNodeAddress().equals(p2PService.getNetworkNode().getNodeAddress())) arbitrator = thisArbitrator; // TODO: unnecessary to compare arbitrator and p2pservice address? + } else { + + // otherwise log warning that arbitrator is unregistered + List arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList()); + log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses); } - - // otherwise log warning - List arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList()); - log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses); } if (arbitrator == null) return false; // invalid arbitrator diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index 5a46de8c..66928a47 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -111,6 +111,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener, PersistedDataHost { private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class); + private static final String THREAD_ID = OpenOfferManager.class.getSimpleName(); private static final long RETRY_REPUBLISH_DELAY_SEC = 10; private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 30; private static final long REPUBLISH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(40); @@ -307,6 +308,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } public void shutDown(@Nullable Runnable completeHandler) { + HavenoUtils.shutDownThreadId(THREAD_ID); stopped = true; p2PService.getPeerManager().removeListener(this); p2PService.removeDecryptedDirectMessageListener(this); @@ -403,56 +405,62 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe maybeUpdatePersistedOffers(); - // Republish means we send the complete offer object - republishOffers(); - startPeriodicRepublishOffersTimer(); + HavenoUtils.submitToThread(() -> { + + // Wait for prices to be available + priceFeedService.awaitPrices(); - // Refresh is started once we get a success from republish + // Republish means we send the complete offer object + republishOffers(); + startPeriodicRepublishOffersTimer(); - // We republish after a bit as it might be that our connected node still has the offer in the data map - // but other peers have it already removed because of expired TTL. - // Those other not directly connected peers would not get the broadcast of the new offer, as the first - // connected peer (seed node) does not broadcast if it has the data in the map. - // To update quickly to the whole network we repeat the republishOffers call after a few seconds when we - // are better connected to the network. There is no guarantee that all peers will receive it but we also - // have our periodic timer, so after that longer interval the offer should be available to all peers. - if (retryRepublishOffersTimer == null) - retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, - REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC); + // Refresh is started once we get a success from republish - p2PService.getPeerManager().addListener(this); + // We republish after a bit as it might be that our connected node still has the offer in the data map + // but other peers have it already removed because of expired TTL. + // Those other not directly connected peers would not get the broadcast of the new offer, as the first + // connected peer (seed node) does not broadcast if it has the data in the map. + // To update quickly to the whole network we repeat the republishOffers call after a few seconds when we + // are better connected to the network. There is no guarantee that all peers will receive it but we also + // have our periodic timer, so after that longer interval the offer should be available to all peers. + if (retryRepublishOffersTimer == null) + retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, + REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC); - // TODO: add to invalid offers on failure -// openOffers.stream() -// .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService) -// .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg)))); + p2PService.getPeerManager().addListener(this); - // process scheduled offers - processScheduledOffers((transaction) -> {}, (errorMessage) -> { - log.warn("Error processing unposted offers: " + errorMessage); - }); + // TODO: add to invalid offers on failure + // openOffers.stream() + // .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService) + // .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg)))); - // register to process unposted offers when unlocked balance increases - if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0); - xmrWalletService.addWalletListener(new MoneroWalletListener() { - @Override - public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { - if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) { - processScheduledOffers((transaction) -> {}, (errorMessage) -> { - log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post - }); + // process scheduled offers + processScheduledOffers((transaction) -> {}, (errorMessage) -> { + log.warn("Error processing unposted offers: " + errorMessage); + }); + + // register to process unposted offers when unlocked balance increases + if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0); + xmrWalletService.addWalletListener(new MoneroWalletListener() { + @Override + public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { + if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) { + processScheduledOffers((transaction) -> {}, (errorMessage) -> { + log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post + }); + } + lastUnlockedBalance = newUnlockedBalance; } - lastUnlockedBalance = newUnlockedBalance; + }); + + // initialize key image poller for signed offers + maybeInitializeKeyImagePoller(); + + // poll spent status of key images + for (SignedOffer signedOffer : signedOffers.getList()) { + signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages()); } - }); - - // initialize key image poller for signed offers - maybeInitializeKeyImagePoller(); - - // poll spent status of key images - for (SignedOffer signedOffer : signedOffers.getList()) { - signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages()); - } + }, THREAD_ID); } @@ -503,7 +511,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount); // schedule or post offer - new Thread(() -> { + HavenoUtils.submitToThread(() -> { synchronized (processOffersLock) { CountDownLatch latch = new CountDownLatch(1); processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> { @@ -520,7 +528,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe }); HavenoUtils.awaitLatch(latch); } - }).start(); + }, THREAD_ID); } // Remove from offerbook @@ -804,7 +812,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler ErrorMessageHandler errorMessageHandler) { - new Thread(() -> { + HavenoUtils.submitToThread(() -> { synchronized (processOffersLock) { List errorMessages = new ArrayList(); List openOffers = getOpenOffers(); @@ -825,7 +833,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); else resultHandler.handleResult(null); } - }).start(); + }, THREAD_ID); } private void processUnpostedOffer(List openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { @@ -1569,8 +1577,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopPeriodicRefreshOffersTimer(); - priceFeedService.awaitPrices(); - new Thread(() -> { processListForRepublishOffers(getOpenOffers()); }).start(); @@ -1653,7 +1659,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, openOffer.getTriggerPrice()); // repost offer - new Thread(() -> { + HavenoUtils.submitToThread(() -> { synchronized (processOffersLock) { CountDownLatch latch = new CountDownLatch(1); processUnpostedOffer(getOpenOffers(), updatedOpenOffer, (transaction) -> { @@ -1670,7 +1676,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe }); HavenoUtils.awaitLatch(latch); } - }).start(); + }, THREAD_ID); } } diff --git a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java index 4aaf31f2..5ac62d45 100644 --- a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java +++ b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java @@ -133,7 +133,7 @@ public class MakerSendSignOfferRequest extends Task { // if unavailable, try alternative arbitrator @Override public void onFault(String errorMessage) { - log.warn("Arbitrator {} unavailable: {}", arbitratorNodeAddress, errorMessage); + log.warn("Arbitrator unavailable: address={}: {}", arbitratorNodeAddress, errorMessage); excludedArbitrators.add(arbitratorNodeAddress); Arbitrator altArbitrator = DisputeAgentSelection.getRandomArbitrator(model.getArbitratorManager(), excludedArbitrators); if (altArbitrator == null) { diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index f20675f4..0772bd73 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -116,7 +116,7 @@ public final class ArbitrationManager extends DisputeManager { + HavenoUtils.submitToThread(() -> { if (message instanceof DisputeOpenedMessage) { handleDisputeOpenedMessage((DisputeOpenedMessage) message); } else if (message instanceof ChatMessage) { @@ -126,7 +126,7 @@ public final class ArbitrationManager extends DisputeManager runTask(String threadId, Runnable task) { + public static Future submitToPool(Runnable task) { + return POOL.submit(task); + } + + public static Future submitToSharedThread(Runnable task) { + return submitToThread(task, HavenoUtils.class.getSimpleName()); + } + + public static Future submitToThread(Runnable task, String threadId) { synchronized (POOLS) { if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1)); return POOLS.get(threadId).submit(task); } } - public static void removeThreadId(String threadId) { + public static void shutDownThreadId(String threadId) { synchronized (POOLS) { if (POOLS.containsKey(threadId)) { POOLS.get(threadId).shutdown(); @@ -489,33 +497,20 @@ public class HavenoUtils { } } - /** - * Submit tasks to a global thread pool. - */ - public static Future submitTask(Runnable task) { - return POOL.submit(task); + // TODO: update monero-java and replace with GenUtils.awaitTasks() + + public static List> awaitTasks(Collection tasks) { + return awaitTasks(tasks, tasks.size()); } - public static List> submitTasks(List tasks) { + public static List> awaitTasks(Collection tasks, int maxConcurrency) { + return awaitTasks(tasks, maxConcurrency, null); + } + + public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) { List> futures = new ArrayList<>(); - for (Runnable task : tasks) futures.add(submitTask(task)); - return futures; - } - - // TODO: replace with GenUtils.executeTasks() once monero-java updated - - public static void executeTasks(Collection tasks) { - executeTasks(tasks, tasks.size()); - } - - public static void executeTasks(Collection tasks, int maxConcurrency) { - executeTasks(tasks, maxConcurrency, null); - } - - public static void executeTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) { - if (tasks.isEmpty()) return; + if (tasks.isEmpty()) return futures; ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency); - List> futures = new ArrayList<>(); for (Runnable task : tasks) futures.add(pool.submit(task)); pool.shutdown(); @@ -535,6 +530,7 @@ public class HavenoUtils { } catch (Exception e) { throw new RuntimeException(e); } + return futures; } public static String toCamelCase(String underscore) { diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 7da8dec4..6e2cc05b 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -590,7 +590,7 @@ public abstract class Trade implements Tradable, Model { // handle daemon changes with max parallelization xmrWalletService.getConnectionService().addConnectionListener(newConnection -> { - HavenoUtils.submitTask(() -> onConnectionChanged(newConnection)); + HavenoUtils.submitToPool(() -> onConnectionChanged(newConnection)); }); // check if done @@ -1812,9 +1812,7 @@ public abstract class Trade implements Tradable, Model { // sync and reprocess messages on new thread if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { - HavenoUtils.submitTask(() -> { - initSyncing(); - }); + new Thread(() -> initSyncing()).start(); } } } @@ -2053,7 +2051,7 @@ public abstract class Trade implements Tradable, Model { @Override public void onNewBlock(long height) { - HavenoUtils.submitTask(() -> { // allow rapid notifications + HavenoUtils.submitToThread(() -> { // allow rapid notifications // skip rapid succession blocks synchronized (this) { @@ -2087,7 +2085,7 @@ public abstract class Trade implements Tradable, Model { e.printStackTrace(); if (isInitialized && !isShutDownStarted && !isWalletConnected()) throw e; } - }); + }, getId()); } } diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index f8acdcaa..1fc048a2 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -235,7 +235,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); if (!(networkEnvelope instanceof TradeMessage)) return; String tradeId = ((TradeMessage) networkEnvelope).getTradeId(); - HavenoUtils.runTask(tradeId, () -> { + HavenoUtils.submitToThread(() -> { if (networkEnvelope instanceof InitTradeRequest) { handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof InitMultisigRequest) { @@ -249,7 +249,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } else if (networkEnvelope instanceof DepositResponse) { handleDepositResponse((DepositResponse) networkEnvelope, peer); } - }); + }, tradeId); } @@ -316,7 +316,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); try { - HavenoUtils.executeTasks(tasks); + HavenoUtils.awaitTasks(tasks); } catch (Exception e) { log.warn("Error notifying trades that shut down started: {}", e.getMessage()); e.printStackTrace(); @@ -346,7 +346,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); try { - HavenoUtils.executeTasks(tasks); + HavenoUtils.awaitTasks(tasks); } catch (Exception e) { log.warn("Error shutting down trades: {}", e.getMessage()); e.printStackTrace(); @@ -443,7 +443,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); }; - HavenoUtils.executeTasks(tasks, threadPoolSize); + HavenoUtils.awaitTasks(tasks, threadPoolSize); log.info("Done initializing persisted trades"); if (isShutDownStarted) return; @@ -452,7 +452,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // sync idle trades once in background after active trades for (Trade trade : trades) { - if (trade.isIdling()) HavenoUtils.submitTask(() -> trade.syncAndPollWallet()); + if (trade.isIdling()) HavenoUtils.submitToPool(() -> trade.syncAndPollWallet()); } // process after all wallets initialized @@ -1205,7 +1205,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // remove trade tradableList.remove(trade); - HavenoUtils.removeThreadId(trade.getId()); + HavenoUtils.shutDownThreadId(trade.getId()); // unregister and persist p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade)); diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java index 68602343..cd668449 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java +++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java @@ -104,6 +104,7 @@ public class XmrWalletService { private static final int MONERO_LOG_LEVEL = 0; private static final int MAX_SYNC_ATTEMPTS = 3; private static final boolean PRINT_STACK_TRACE = false; + private static final String THREAD_ID = XmrWalletService.class.getSimpleName(); private final Preferences preferences; private final CoreAccountService accountService; @@ -668,9 +669,6 @@ public class XmrWalletService { wallet.removeListener(listener); } } - - // prepare trades for shut down - if (tradeManager != null) tradeManager.onShutDownStarted(); } public void shutDown() { @@ -681,7 +679,7 @@ public class XmrWalletService { List tasks = new ArrayList(); if (tradeManager != null) tasks.add(() -> tradeManager.shutDown()); tasks.add(() -> closeMainWallet(true)); - HavenoUtils.executeTasks(tasks); + HavenoUtils.awaitTasks(tasks); log.info("Done shutting down all wallets"); } @@ -767,12 +765,12 @@ public class XmrWalletService { // reschedule to init main wallet UserThread.runAfter(() -> { - new Thread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS)).start(); + HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID); }, xmrConnectionService.getRefreshPeriodMs() / 1000); } else { log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000); UserThread.runAfter(() -> { - new Thread(() -> maybeInitMainWallet(true, numAttempts - 1)).start(); + HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID); }, xmrConnectionService.getRefreshPeriodMs() / 1000); } } @@ -803,20 +801,22 @@ public class XmrWalletService { } private void updateSyncProgress() { - walletHeight.set(wallet.getHeight()); + UserThread.await(() -> { + walletHeight.set(wallet.getHeight()); - // new wallet reports height 1 before synced - if (wallet.getHeight() == 1) { - downloadListener.progress(.0001, xmrConnectionService.getTargetHeight(), null); // >0% shows progress bar - return; - } + // new wallet reports height 1 before synced + if (wallet.getHeight() == 1) { + downloadListener.progress(.0001, xmrConnectionService.getTargetHeight(), null); // >0% shows progress bar + return; + } - // set progress - long targetHeight = xmrConnectionService.getTargetHeight(); - long blocksLeft = targetHeight - walletHeight.get(); - if (syncStartHeight == null) syncStartHeight = walletHeight.get(); - double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, walletHeight.get() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress - downloadListener.progress(percent, blocksLeft, null); + // set progress + long targetHeight = xmrConnectionService.getTargetHeight(); + long blocksLeft = targetHeight - walletHeight.get(); + if (syncStartHeight == null) syncStartHeight = walletHeight.get(); + double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, walletHeight.get() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress + downloadListener.progress(percent, blocksLeft, null); + }); } private MoneroWalletRpc createWalletRpc(MoneroWalletConfig config, Integer port) { @@ -938,14 +938,16 @@ public class XmrWalletService { // sync wallet on new thread if (connection != null) { wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE); - new Thread(() -> { - try { - if (!Boolean.FALSE.equals(connection.isConnected())) wallet.sync(); - wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs()); - } catch (Exception e) { - log.warn("Failed to sync main wallet after setting daemon connection: " + e.getMessage()); + HavenoUtils.submitToThread(() -> { + synchronized (walletLock) { + try { + if (Boolean.TRUE.equals(connection.isConnected())) wallet.sync(); + wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs()); + } catch (Exception e) { + log.warn("Failed to sync main wallet after setting daemon connection: " + e.getMessage()); + } } - }).start(); + }, THREAD_ID); } log.info("Done setting main wallet daemon connection: " + (connection == null ? null : connection.getUri())); @@ -977,7 +979,7 @@ public class XmrWalletService { } // excute tasks in parallel - HavenoUtils.executeTasks(tasks, Math.min(10, 1 + trades.size())); + HavenoUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size())); log.info("Done changing all wallet passwords"); } @@ -1259,17 +1261,14 @@ public class XmrWalletService { BigInteger balance; if (balanceListener.getSubaddressIndex() != null && balanceListener.getSubaddressIndex() != 0) balance = getBalanceForSubaddress(balanceListener.getSubaddressIndex()); else balance = getAvailableBalance(); - UserThread.execute(new Runnable() { // TODO (woodser): don't execute on UserThread - @Override - public void run() { - try { - balanceListener.onBalanceChanged(balance); - } catch (Exception e) { - log.warn("Failed to notify balance listener of change"); - e.printStackTrace(); - } + HavenoUtils.submitToThread(() -> { + try { + balanceListener.onBalanceChanged(balance); + } catch (Exception e) { + log.warn("Failed to notify balance listener of change"); + e.printStackTrace(); } - }); + }, THREAD_ID); } } @@ -1313,54 +1312,39 @@ public class XmrWalletService { @Override public void onSyncProgress(long height, long startHeight, long endHeight, double percentDone, String message) { - UserThread.execute(new Runnable() { - @Override - public void run() { - for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message); - } - }); + HavenoUtils.submitToThread(() -> { + for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message); + }, THREAD_ID); } @Override public void onNewBlock(long height) { - UserThread.execute(new Runnable() { - @Override - public void run() { - walletHeight.set(height); - for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height); - } - }); + HavenoUtils.submitToThread(() -> { + walletHeight.set(height); + for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height); + }, THREAD_ID); } @Override public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) { - UserThread.execute(new Runnable() { - @Override - public void run() { - for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance); - updateBalanceListeners(); - } - }); + HavenoUtils.submitToThread(() -> { + for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance); + updateBalanceListeners(); + }, THREAD_ID); } @Override public void onOutputReceived(MoneroOutputWallet output) { - UserThread.execute(new Runnable() { - @Override - public void run() { - for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output); - } - }); + HavenoUtils.submitToThread(() -> { + for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output); + }, THREAD_ID); } @Override public void onOutputSpent(MoneroOutputWallet output) { - UserThread.execute(new Runnable() { - @Override - public void run() { - for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output); - } - }); + HavenoUtils.submitToThread(() -> { + for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output); + }, THREAD_ID); } } } diff --git a/desktop/src/main/java/haveno/desktop/main/funds/deposit/DepositListItem.java b/desktop/src/main/java/haveno/desktop/main/funds/deposit/DepositListItem.java index c7de866d..fd492912 100644 --- a/desktop/src/main/java/haveno/desktop/main/funds/deposit/DepositListItem.java +++ b/desktop/src/main/java/haveno/desktop/main/funds/deposit/DepositListItem.java @@ -20,6 +20,7 @@ package haveno.desktop.main.funds.deposit; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import haveno.common.UserThread; import haveno.core.locale.Res; import haveno.core.trade.HavenoUtils; import haveno.core.util.coin.CoinFormatter; @@ -65,9 +66,11 @@ class DepositListItem { balanceListener = new XmrBalanceListener(addressEntry.getSubaddressIndex()) { @Override public void onBalanceChanged(BigInteger balance) { - DepositListItem.this.balanceAsBI = balance; - DepositListItem.this.balance.set(HavenoUtils.formatXmr(balanceAsBI)); - updateUsage(addressEntry.getSubaddressIndex(), null); + UserThread.execute(() -> { + DepositListItem.this.balanceAsBI = balance; + DepositListItem.this.balance.set(HavenoUtils.formatXmr(balanceAsBI)); + updateUsage(addressEntry.getSubaddressIndex(), null); + }); } }; xmrWalletService.addBalanceListener(balanceListener); diff --git a/desktop/src/main/java/haveno/desktop/main/offer/takeoffer/TakeOfferView.java b/desktop/src/main/java/haveno/desktop/main/offer/takeoffer/TakeOfferView.java index 38828f70..8c29f691 100644 --- a/desktop/src/main/java/haveno/desktop/main/offer/takeoffer/TakeOfferView.java +++ b/desktop/src/main/java/haveno/desktop/main/offer/takeoffer/TakeOfferView.java @@ -902,13 +902,15 @@ public class TakeOfferView extends ActivatableViewAndModel { - if (newValue) { - fundingHBox.getChildren().remove(cancelButton2); - takeOfferBox.getChildren().add(cancelButton2); - } else if (!fundingHBox.getChildren().contains(cancelButton2)) { - takeOfferBox.getChildren().remove(cancelButton2); - fundingHBox.getChildren().add(cancelButton2); - } + UserThread.execute(() -> { + if (newValue) { + fundingHBox.getChildren().remove(cancelButton2); + takeOfferBox.getChildren().add(cancelButton2); + } else if (!fundingHBox.getChildren().contains(cancelButton2)) { + takeOfferBox.getChildren().remove(cancelButton2); + fundingHBox.getChildren().add(cancelButton2); + } + }); }); cancelButton2 = new AutoTooltipButton(Res.get("shared.cancel")); diff --git a/p2p/src/main/java/haveno/network/p2p/network/Connection.java b/p2p/src/main/java/haveno/network/p2p/network/Connection.java index bd887e2c..809fb7c4 100644 --- a/p2p/src/main/java/haveno/network/p2p/network/Connection.java +++ b/p2p/src/main/java/haveno/network/p2p/network/Connection.java @@ -74,6 +74,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -109,6 +110,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { //TODO decrease limits again after testing private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240); private static final int SHUTDOWN_TIMEOUT = 100; + private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); // one shared thread to handle messages sequentially public static int getPermittedMessageSize() { return PERMITTED_MESSAGE_SIZE; @@ -122,6 +124,17 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { return SHUTDOWN_TIMEOUT; } + public static void shutDownExecutor(int timeoutSeconds) { + try { + EXECUTOR.shutdown(); + if (!EXECUTOR.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) EXECUTOR.shutdownNow(); + } catch (InterruptedException e) { + EXECUTOR.shutdownNow(); + e.printStackTrace(); + log.warn("Error shutting down connection executor: " + e.getMessage()); + } + }; + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields /////////////////////////////////////////////////////////////////////////////////////////// @@ -211,7 +224,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { reportInvalidRequest(RuleViolation.PEER_BANNED); } } - UserThread.execute(() -> connectionListener.onConnection(this)); + EXECUTOR.execute(() -> connectionListener.onConnection(this)); } catch (Throwable e) { handleException(e); } @@ -266,8 +279,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { if (!stopped) { protoOutputStream.writeEnvelope(networkEnvelope); - UserThread.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this))); - UserThread.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize)); + EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this))); + EXECUTOR.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize)); } } catch (Throwable t) { handleException(t); @@ -396,7 +409,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { if (networkEnvelope instanceof BundleOfEnvelopes) { onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection); } else { - UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); + EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); } } @@ -432,7 +445,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { envelopesToProcess.add(networkEnvelope); } } - envelopesToProcess.forEach(envelope -> UserThread.execute(() -> + envelopesToProcess.forEach(envelope -> EXECUTOR.execute(() -> messageListeners.forEach(listener -> listener.onMessage(envelope, connection)))); } @@ -516,7 +529,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { } private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) { - // Use UserThread.execute as it's not clear if that is called from a non-UserThread UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this)); try { protoOutputStream.onConnectionShutdown(); @@ -539,7 +551,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { Utilities.shutdownAndAwaitTermination(executorService, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); log.debug("Connection shutdown complete {}", this); - // Use UserThread.execute as it's not clear if that is called from a non-UserThread if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler); } @@ -847,8 +858,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { networkEnvelope.getClass().getSimpleName(), uid); } - onMessage(networkEnvelope, this); - UserThread.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size)); + EXECUTOR.execute(() -> onMessage(networkEnvelope, this)); + EXECUTOR.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size)); } } catch (InvalidClassException e) { log.error(e.getMessage()); @@ -897,7 +908,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { capabilitiesListeners.forEach(weakListener -> { SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get(); if (supportedCapabilitiesListener != null) { - UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities)); + EXECUTOR.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities)); } }); return false;