From c06a85b929bd2cbb9e362f76b55a6e5668cae4b3 Mon Sep 17 00:00:00 2001 From: woodser Date: Fri, 5 Jan 2024 13:23:52 -0500 Subject: [PATCH] clear and shut down inactive trades, move more to trade threads --- .../main/java/haveno/common/ThreadUtils.java | 60 ++++++++++-------- .../main/java/haveno/common/UserThread.java | 6 +- .../arbitration/ArbitrationManager.java | 5 +- .../main/java/haveno/core/trade/Trade.java | 62 +++++++++---------- .../java/haveno/core/trade/TradeManager.java | 6 +- .../core/trade/protocol/BuyerProtocol.java | 5 +- .../core/trade/protocol/SellerProtocol.java | 5 +- .../core/trade/protocol/TradeProtocol.java | 2 + .../network/p2p/peers/BroadcastHandler.java | 6 +- 9 files changed, 82 insertions(+), 75 deletions(-) diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index c4cb5c4b..46fe3f4b 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; public class ThreadUtils { private static final Map EXECUTORS = new HashMap<>(); - private static final Map THREAD_BY_ID = new HashMap<>(); + private static final Map THREADS = new HashMap<>(); private static final int POOL_SIZE = 10; private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); @@ -40,8 +40,8 @@ public class ThreadUtils { synchronized (EXECUTORS) { if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); EXECUTORS.get(threadId).execute(() -> { - synchronized (THREAD_BY_ID) { - THREAD_BY_ID.put(threadId, Thread.currentThread()); + synchronized (THREADS) { + THREADS.put(threadId, Thread.currentThread()); } command.run(); }); @@ -53,8 +53,10 @@ public class ThreadUtils { command.run(); } else { CountDownLatch latch = new CountDownLatch(1); - execute(command, threadId); // run task - execute(() -> latch.countDown(), threadId); // await next tick + execute(() -> { + command.run(); + latch.countDown(); + }, threadId); try { latch.await(); } catch (InterruptedException e) { @@ -64,25 +66,29 @@ public class ThreadUtils { } public static void shutDown(String threadId, long timeoutMs) { - ExecutorService pool = null; - synchronized (EXECUTORS) { - if (!EXECUTORS.containsKey(threadId)) return; // thread not found - pool = EXECUTORS.get(threadId); - } - pool.shutdown(); - try { - if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); - } catch (InterruptedException e) { - pool.shutdownNow(); - throw new RuntimeException(e); - } finally { - synchronized (EXECUTORS) { - EXECUTORS.remove(threadId); - } - synchronized (THREAD_BY_ID) { - THREAD_BY_ID.remove(threadId); - } - } + ExecutorService pool = null; + synchronized (EXECUTORS) { + if (!EXECUTORS.containsKey(threadId)) return; // thread not found + pool = EXECUTORS.get(threadId); + } + pool.shutdown(); + try { + if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); + } catch (InterruptedException e) { + pool.shutdownNow(); + throw new RuntimeException(e); + } finally { + remove(threadId); + } + } + + public static void remove(String threadId) { + synchronized (EXECUTORS) { + EXECUTORS.remove(threadId); + } + synchronized (THREADS) { + THREADS.remove(threadId); + } } public static Future submitToPool(Runnable task) { @@ -136,9 +142,9 @@ public class ThreadUtils { } private static boolean isCurrentThread(Thread thread, String threadId) { - synchronized (THREAD_BY_ID) { - if (!THREAD_BY_ID.containsKey(threadId)) return false; - return thread == THREAD_BY_ID.get(threadId); + synchronized (THREADS) { + if (!THREADS.containsKey(threadId)) return false; + return thread == THREADS.get(threadId); } } } diff --git a/common/src/main/java/haveno/common/UserThread.java b/common/src/main/java/haveno/common/UserThread.java index cacc16bd..3055a200 100644 --- a/common/src/main/java/haveno/common/UserThread.java +++ b/common/src/main/java/haveno/common/UserThread.java @@ -71,8 +71,10 @@ public class UserThread { command.run(); } else { CountDownLatch latch = new CountDownLatch(1); - execute(command); // run task - execute(() -> latch.countDown()); // await next tick + execute(() -> { + command.run(); + latch.countDown(); + }); try { latch.await(); } catch (InterruptedException e) { diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index e0b25185..58694332 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -331,7 +331,8 @@ public final class ArbitrationManager extends DisputeManager { + if (trade.isShutDownStarted()) return; + ThreadUtils.execute(() -> { synchronized (trade) { // skip if no need to reprocess @@ -342,7 +343,7 @@ public final class ArbitrationManager extends DisputeManager { if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); + // check if done + if (isPayoutUnlocked()) { + clearAndShutDown(); + return; + } + // set arbitrator pub key ring once known serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); @@ -603,12 +609,6 @@ public abstract class Trade implements Tradable, Model { }); }); - // check if done - if (isPayoutUnlocked()) { - maybeClearProcessData(); - return; - } - // reset buyer's payment sent state if no ack receive if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); @@ -623,6 +623,7 @@ public abstract class Trade implements Tradable, Model { // handle trade state events tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { + if (isShutDownStarted) return; ThreadUtils.execute(() -> { if (newValue == Trade.State.MULTISIG_COMPLETED) { updateWalletRefreshPeriod(); @@ -633,6 +634,7 @@ public abstract class Trade implements Tradable, Model { // handle trade phase events tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { + if (isShutDownStarted) return; ThreadUtils.execute(() -> { if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); if (isPaymentReceived()) { @@ -648,6 +650,7 @@ public abstract class Trade implements Tradable, Model { // handle payout events payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { + if (isShutDownStarted) return; ThreadUtils.execute(() -> { if (isPayoutPublished()) updateWalletRefreshPeriod(); @@ -679,21 +682,7 @@ public abstract class Trade implements Tradable, Model { if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { if (!isInitialized) return; log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); - ThreadUtils.execute(() -> { - deleteWallet(); - maybeClearProcessData(); - if (idlePayoutSyncer != null) { - xmrWalletService.removeWalletListener(idlePayoutSyncer); - idlePayoutSyncer = null; - } - UserThread.execute(() -> { - if (payoutStateSubscription != null) { - payoutStateSubscription.unsubscribe(); - payoutStateSubscription = null; - } - }); - }, getId()); - + clearAndShutDown(); } }, getId()); }); @@ -912,7 +901,7 @@ public abstract class Trade implements Tradable, Model { throw new RuntimeException("Refusing to delete wallet for " + getClass().getSimpleName() + " " + getId() + " because it has a balance"); } - // force stop the wallet + // force stop wallet if (wallet != null) stopWallet(); // delete wallet @@ -1195,21 +1184,24 @@ public abstract class Trade implements Tradable, Model { return payoutAmountFromMediation < normalPayoutAmount; } - public void maybeClearProcessData() { + public void clearAndShutDown() { + ThreadUtils.execute(() -> clearProcessData(), getId()); + ThreadUtils.submitToPool(() -> shutDown()); // run off trade thread + } + + private void clearProcessData() { + + // delete trade wallet synchronized (walletLock) { - - // skip if already cleared - if (!walletExists()) return; - - // delete trade wallet + if (!walletExists()) return; // done if already cleared deleteWallet(); + } - // TODO: clear other process data - setPayoutTxHex(null); - for (TradePeer peer : getPeers()) { - peer.setUnsignedPayoutTxHex(null); - peer.setUpdatedMultisigHex(null); - } + // TODO: clear other process data + setPayoutTxHex(null); + for (TradePeer peer : getPeers()) { + peer.setUnsignedPayoutTxHex(null); + peer.setUpdatedMultisigHex(null); } } @@ -1230,6 +1222,8 @@ public abstract class Trade implements Tradable, Model { } public void shutDown() { + if (isShutDown) return; // ignore if already shut down + isShutDownStarted = true; if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); // shut down thread pools with timeout diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 53374559..03c1783c 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -1262,10 +1262,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // remove trade from list removeTrade(trade); - - // delete trade wallet - if (trade.walletExists()) trade.deleteWallet(); } + + // clear and shut down trade + trade.clearAndShutDown(); } private void listenForCleanup(Trade trade) { diff --git a/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java index 64812122..ebb44b24 100644 --- a/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/BuyerProtocol.java @@ -17,6 +17,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.ResultHandler; import haveno.core.trade.BuyerTrade; @@ -97,7 +98,7 @@ public class BuyerProtocol extends DisputeProtocol { public void onPaymentSent(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { System.out.println("BuyerProtocol.onPaymentSent()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.errorMessageHandler = errorMessageHandler; @@ -127,7 +128,7 @@ public class BuyerProtocol extends DisputeProtocol { } awaitTradeLatch(); } - }).start(); + }, trade.getId()); } @SuppressWarnings("unchecked") diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java index c6802aa3..add37160 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java @@ -17,6 +17,7 @@ package haveno.core.trade.protocol; +import haveno.common.ThreadUtils; import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.ResultHandler; import haveno.core.trade.SellerTrade; @@ -94,7 +95,7 @@ public class SellerProtocol extends DisputeProtocol { public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { log.info("SellerProtocol.onPaymentReceived()"); - new Thread(() -> { + ThreadUtils.execute(() -> { synchronized (trade) { latchTrade(); this.errorMessageHandler = errorMessageHandler; @@ -123,7 +124,7 @@ public class SellerProtocol extends DisputeProtocol { } awaitTradeLatch(); } - }).start(); + }, trade.getId()); } @SuppressWarnings("unchecked") diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 0fc2b18f..f04f9f1f 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -265,6 +265,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } public void maybeSendDepositsConfirmedMessages() { + if (!trade.isInitialized() || trade.isShutDownStarted()) return; ThreadUtils.execute(() -> { if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return; synchronized (trade) { @@ -286,6 +287,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { + if (trade.isShutDownStarted()) return; ThreadUtils.execute(() -> { synchronized (trade) { diff --git a/p2p/src/main/java/haveno/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/haveno/network/p2p/peers/BroadcastHandler.java index 8dcab4b5..eb2ece9e 100644 --- a/p2p/src/main/java/haveno/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/haveno/network/p2p/peers/BroadcastHandler.java @@ -276,13 +276,13 @@ public class BroadcastHandler implements PeerManager.Listener { @Override public void onFailure(@NotNull Throwable throwable) { - log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable); - numOfFailedBroadcasts.incrementAndGet(); - if (stopped.get()) { return; } + log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable); + numOfFailedBroadcasts.incrementAndGet(); + maybeNotifyListeners(broadcastRequestsForConnection); checkForCompletion(); }