repeatedly acquire lock on trade shut down and fix timeout

This commit is contained in:
woodser 2024-01-09 12:01:01 -05:00
parent 5cc53a82e3
commit db155283be
6 changed files with 56 additions and 36 deletions

View file

@ -65,12 +65,17 @@ public class ThreadUtils {
} }
} }
public static void shutDown(String threadId, long timeoutMs) { public static void shutDown(String threadId) {
shutDown(threadId, null);
}
public static void shutDown(String threadId, Long timeoutMs) {
if (timeoutMs == null) timeoutMs = Long.MAX_VALUE;
ExecutorService pool = null; ExecutorService pool = null;
synchronized (EXECUTORS) { synchronized (EXECUTORS) {
if (!EXECUTORS.containsKey(threadId)) return; // thread not found
pool = EXECUTORS.get(threadId); pool = EXECUTORS.get(threadId);
} }
if (pool == null) return; // thread not found
pool.shutdown(); pool.shutdown();
try { try {
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow();
@ -101,10 +106,12 @@ public class ThreadUtils {
return futures; return futures;
} }
// TODO: these are unused; remove? use monero-java awaitTasks() when updated
public static Future<?> awaitTask(Runnable task) { public static Future<?> awaitTask(Runnable task) {
return awaitTasks(Arrays.asList(task)).get(0); return awaitTask(task, null);
}
public static Future<?> awaitTask(Runnable task, Long timeoutMs) {
return awaitTasks(Arrays.asList(task), 1, timeoutMs).get(0);
} }
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) { public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) {
@ -115,30 +122,20 @@ public class ThreadUtils {
return awaitTasks(tasks, maxConcurrency, null); return awaitTasks(tasks, maxConcurrency, null);
} }
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutSeconds) { public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutMs) {
if (timeoutMs == null) timeoutMs = Long.MAX_VALUE;
if (tasks.isEmpty()) return new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
try {
List<Future<?>> futures = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>();
if (tasks.isEmpty()) return futures; for (Runnable task : tasks) futures.add(executorService.submit(task, null));
ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency); for (Future<?> future : futures) future.get(timeoutMs, TimeUnit.MILLISECONDS);
for (Runnable task : tasks) futures.add(pool.submit(task)); return futures;
pool.shutdown();
// interrupt after timeout
if (timeoutSeconds != null) {
try {
if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) pool.shutdownNow();
} catch (InterruptedException e) {
pool.shutdownNow();
throw new RuntimeException(e);
}
}
// throw exception from any tasks
try {
for (Future<?> future : futures) future.get();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
executorService.shutdownNow();
} }
return futures;
} }
private static boolean isCurrentThread(Thread thread, String threadId) { private static boolean isCurrentThread(Thread thread, String threadId) {

View file

@ -341,7 +341,7 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try { try {
ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout ThreadUtils.awaitTasks(tasks, tasks.size(), 120000l); // run in parallel with timeout
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View file

@ -104,7 +104,7 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable {
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted()); tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try { try {
ThreadUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout ThreadUtils.awaitTasks(tasks, tasks.size(), 120000l); // run in parallel with timeout
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View file

@ -1226,14 +1226,31 @@ public abstract class Trade implements Tradable, Model {
isShutDownStarted = true; isShutDownStarted = true;
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
// shut down thread pools with timeout // create task to shut down trade
List<Runnable> tasks = new ArrayList<>(); Runnable shutDownTask = () -> {
tasks.add(() -> ThreadUtils.shutDown(getId(), SHUTDOWN_TIMEOUT_MS));
tasks.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId(), SHUTDOWN_TIMEOUT_MS)); // repeatedly acquire lock to clear tasks
for (int i = 0; i < 20; i++) {
synchronized (this) {
GenUtils.waitFor(10);
}
}
// shut down trade threads
synchronized (this) {
List<Runnable> shutDownThreads = new ArrayList<>();
shutDownThreads.add(() -> ThreadUtils.shutDown(getId()));
shutDownThreads.add(() -> ThreadUtils.shutDown(getConnectionChangedThreadId()));
ThreadUtils.awaitTasks(shutDownThreads);
}
};
// shut down trade with timeout
try { try {
ThreadUtils.awaitTasks(tasks); ThreadUtils.awaitTask(shutDownTask, SHUTDOWN_TIMEOUT_MS);
} catch (Exception e) { } catch (Exception e) {
log.warn("Timeout shutting down {} {}", getClass().getSimpleName(), getId()); log.warn("Error shutting down {} {}: {}", getClass().getSimpleName(), getId(), e.getMessage());
e.printStackTrace();
// force stop wallet // force stop wallet
if (wallet != null) { if (wallet != null) {
@ -1251,8 +1268,13 @@ public abstract class Trade implements Tradable, Model {
idlePayoutSyncer = null; idlePayoutSyncer = null;
} }
if (wallet != null) { if (wallet != null) {
try {
xmrWalletService.saveWallet(wallet, false); // skip backup xmrWalletService.saveWallet(wallet, false); // skip backup
stopWallet(); stopWallet();
} catch (Exception e) {
// warning will be logged for main wallet, so skip logging here
//log.warn("Error closing monero-wallet-rpc subprocess for {} {}: {}. Was Haveno stopped manually with ctrl+c?", getClass().getSimpleName(), getId(), e.getMessage());
}
} }
UserThread.execute(() -> { UserThread.execute(() -> {
if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe(); if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe();

View file

@ -952,6 +952,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
private void updateTradePeriodState() { private void updateTradePeriodState() {
if (isShutDownStarted) return;
for (Trade trade : new ArrayList<Trade>(tradableList.getList())) { for (Trade trade : new ArrayList<Trade>(tradableList.getList())) {
if (!trade.isPayoutPublished()) { if (!trade.isPayoutPublished()) {
Date maxTradePeriodDate = trade.getMaxTradePeriodDate(); Date maxTradePeriodDate = trade.getMaxTradePeriodDate();

View file

@ -1047,7 +1047,7 @@ public class XmrWalletService {
wallet = null; wallet = null;
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("Error closing main monero-wallet-rpc subprocess: " + e.getMessage() + ". Was Haveno stopped manually with ctrl+c?"); log.warn("Error closing main monero-wallet-rpc subprocess: {}. Was Haveno stopped manually with ctrl+c?", e.getMessage());
} }
} }
} }