diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index 46fe3f4b..c2b9aad9 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -65,12 +65,17 @@ public class ThreadUtils { } } - public static void shutDown(String threadId, long timeoutMs) { + public static void shutDown(String threadId) { + shutDown(threadId, null); + } + + public static void shutDown(String threadId, Long timeoutMs) { + if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; ExecutorService pool = null; synchronized (EXECUTORS) { - if (!EXECUTORS.containsKey(threadId)) return; // thread not found pool = EXECUTORS.get(threadId); } + if (pool == null) return; // thread not found pool.shutdown(); try { if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); @@ -101,10 +106,12 @@ public class ThreadUtils { return futures; } - // TODO: these are unused; remove? use monero-java awaitTasks() when updated - public static Future awaitTask(Runnable task) { - return awaitTasks(Arrays.asList(task)).get(0); + return awaitTask(task, null); + } + + public static Future awaitTask(Runnable task, Long timeoutMs) { + return awaitTasks(Arrays.asList(task), 1, timeoutMs).get(0); } public static List> awaitTasks(Collection tasks) { @@ -115,30 +122,20 @@ public class ThreadUtils { return awaitTasks(tasks, maxConcurrency, null); } - public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutSeconds) { - List> futures = new ArrayList<>(); - if (tasks.isEmpty()) return futures; - ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency); - for (Runnable task : tasks) futures.add(pool.submit(task)); - pool.shutdown(); - - // interrupt after timeout - if (timeoutSeconds != null) { - try { - if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow(); - } catch (InterruptedException e) { - pool.shutdownNow(); - throw new RuntimeException(e); - } - } - - // throw exception from any tasks + public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutMs) { + if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; + if (tasks.isEmpty()) return new ArrayList<>(); + ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); try { - for (Future future : futures) future.get(); + List> futures = new ArrayList<>(); + for (Runnable task : tasks) futures.add(executorService.submit(task, null)); + for (Future future : futures) future.get(timeoutMs, TimeUnit.MILLISECONDS); + return futures; } catch (Exception e) { throw new RuntimeException(e); + } finally { + executorService.shutdownNow(); } - return futures; } private static boolean isCurrentThread(Thread thread, String threadId) { diff --git a/core/src/main/java/haveno/core/app/HavenoExecutable.java b/core/src/main/java/haveno/core/app/HavenoExecutable.java index cbbc9b31..77571041 100644 --- a/core/src/main/java/haveno/core/app/HavenoExecutable.java +++ b/core/src/main/java/haveno/core/app/HavenoExecutable.java @@ -341,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); try { - ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + ThreadUtils.awaitTasks(tasks, tasks.size(), 120000l); // run in parallel with timeout } catch (Exception e) { e.printStackTrace(); } diff --git a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java index ea44c0b3..e102dfc3 100644 --- a/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java +++ b/core/src/main/java/haveno/core/app/misc/ExecutableForAppWithP2p.java @@ -104,7 +104,7 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable { tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); try { - ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout + ThreadUtils.awaitTasks(tasks, tasks.size(), 120000l); // run in parallel with timeout } catch (Exception e) { e.printStackTrace(); } diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 7473245e..591de631 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -1226,14 +1226,31 @@ public abstract class Trade implements Tradable, Model { isShutDownStarted = true; if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); - // shut down thread pools with timeout - List tasks = new ArrayList<>(); - tasks.add(() -> ThreadUtils.shutDown(getId(), SHUTDOWN_TIMEOUT_MS)); - tasks.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId(), SHUTDOWN_TIMEOUT_MS)); + // create task to shut down trade + Runnable shutDownTask = () -> { + + // repeatedly acquire lock to clear tasks + for (int i = 0; i < 20; i++) { + synchronized (this) { + GenUtils.waitFor(10); + } + } + + // shut down trade threads + synchronized (this) { + List shutDownThreads = new ArrayList<>(); + shutDownThreads.add(() -> ThreadUtils.shutDown(getId())); + shutDownThreads.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId())); + ThreadUtils.awaitTasks(shutDownThreads); + } + }; + + // shut down trade with timeout try { - ThreadUtils.awaitTasks(tasks); + ThreadUtils.awaitTask(shutDownTask, SHUTDOWN_TIMEOUT_MS); } catch (Exception e) { - log.warn("Timeout shutting down {} {}", getClass().getSimpleName(), getId()); + log.warn("Error shutting down {} {}: {}", getClass().getSimpleName(), getId(), e.getMessage()); + e.printStackTrace(); // force stop wallet if (wallet != null) { @@ -1251,8 +1268,13 @@ public abstract class Trade implements Tradable, Model { idlePayoutSyncer = null; } if (wallet != null) { - xmrWalletService.saveWallet(wallet, false); // skip backup - stopWallet(); + try { + xmrWalletService.saveWallet(wallet, false); // skip backup + stopWallet(); + } catch (Exception e) { + // warning will be logged for main wallet, so skip logging here + //log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage()); + } } UserThread.execute(() -> { if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe(); diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 03c1783c..eaa610b3 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -952,6 +952,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } private void updateTradePeriodState() { + if (isShutDownStarted) return; for (Trade trade : new ArrayList(tradableList.getList())) { if (!trade.isPayoutPublished()) { Date maxTradePeriodDate = trade.getMaxTradePeriodDate(); diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java index a737eb18..4da9bee7 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java +++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java @@ -1047,7 +1047,7 @@ public class XmrWalletService { wallet = null; } } catch (Exception e) { - log.warn("Error closing main monero-wallet-rpc subprocess: " + e.getMessage() + ". Was Haveno stopped manually with ctrl+c?"); + log.warn("Error closing main monero-wallet-rpc subprocess: {}. Was Haveno stopped manually with ctrl+c?", e.getMessage()); } } }