clear and shut down inactive trades, move more to trade threads

This commit is contained in:
woodser 2024-01-05 13:23:52 -05:00
parent c14f37b595
commit c06a85b929
9 changed files with 82 additions and 75 deletions

View file

@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
public class ThreadUtils { public class ThreadUtils {
private static final Map<String, ExecutorService> EXECUTORS = new HashMap<>(); private static final Map<String, ExecutorService> EXECUTORS = new HashMap<>();
private static final Map<String, Thread> THREAD_BY_ID = new HashMap<>(); private static final Map<String, Thread> THREADS = new HashMap<>();
private static final int POOL_SIZE = 10; private static final int POOL_SIZE = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
@ -40,8 +40,8 @@ public class ThreadUtils {
synchronized (EXECUTORS) { synchronized (EXECUTORS) {
if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1));
EXECUTORS.get(threadId).execute(() -> { EXECUTORS.get(threadId).execute(() -> {
synchronized (THREAD_BY_ID) { synchronized (THREADS) {
THREAD_BY_ID.put(threadId, Thread.currentThread()); THREADS.put(threadId, Thread.currentThread());
} }
command.run(); command.run();
}); });
@ -53,8 +53,10 @@ public class ThreadUtils {
command.run(); command.run();
} else { } else {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
execute(command, threadId); // run task execute(() -> {
execute(() -> latch.countDown(), threadId); // await next tick command.run();
latch.countDown();
}, threadId);
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -64,25 +66,29 @@ public class ThreadUtils {
} }
public static void shutDown(String threadId, long timeoutMs) { public static void shutDown(String threadId, long timeoutMs) {
ExecutorService pool = null; ExecutorService pool = null;
synchronized (EXECUTORS) { synchronized (EXECUTORS) {
if (!EXECUTORS.containsKey(threadId)) return; // thread not found if (!EXECUTORS.containsKey(threadId)) return; // thread not found
pool = EXECUTORS.get(threadId); pool = EXECUTORS.get(threadId);
} }
pool.shutdown(); pool.shutdown();
try { try {
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow();
} catch (InterruptedException e) { } catch (InterruptedException e) {
pool.shutdownNow(); pool.shutdownNow();
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
synchronized (EXECUTORS) { remove(threadId);
EXECUTORS.remove(threadId); }
} }
synchronized (THREAD_BY_ID) {
THREAD_BY_ID.remove(threadId); public static void remove(String threadId) {
} synchronized (EXECUTORS) {
} EXECUTORS.remove(threadId);
}
synchronized (THREADS) {
THREADS.remove(threadId);
}
} }
public static Future<?> submitToPool(Runnable task) { public static Future<?> submitToPool(Runnable task) {
@ -136,9 +142,9 @@ public class ThreadUtils {
} }
private static boolean isCurrentThread(Thread thread, String threadId) { private static boolean isCurrentThread(Thread thread, String threadId) {
synchronized (THREAD_BY_ID) { synchronized (THREADS) {
if (!THREAD_BY_ID.containsKey(threadId)) return false; if (!THREADS.containsKey(threadId)) return false;
return thread == THREAD_BY_ID.get(threadId); return thread == THREADS.get(threadId);
} }
} }
} }

View file

@ -71,8 +71,10 @@ public class UserThread {
command.run(); command.run();
} else { } else {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
execute(command); // run task execute(() -> {
execute(() -> latch.countDown()); // await next tick command.run();
latch.countDown();
});
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View file

@ -331,7 +331,8 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
} }
public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) { public void maybeReprocessDisputeClosedMessage(Trade trade, boolean reprocessOnError) {
new Thread(() -> { if (trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> {
synchronized (trade) { synchronized (trade) {
// skip if no need to reprocess // skip if no need to reprocess
@ -342,7 +343,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId()); log.warn("Reprocessing dispute closed message for {} {}", trade.getClass().getSimpleName(), trade.getId());
handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError); handleDisputeClosedMessage(trade.getArbitrator().getDisputeClosedMessage(), reprocessOnError);
} }
}).start(); }, trade.getId());
} }
private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) { private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) {

View file

@ -591,6 +591,12 @@ public abstract class Trade implements Tradable, Model {
ThreadUtils.await(() -> { ThreadUtils.await(() -> {
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
// check if done
if (isPayoutUnlocked()) {
clearAndShutDown();
return;
}
// set arbitrator pub key ring once known // set arbitrator pub key ring once known
serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> {
getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing());
@ -603,12 +609,6 @@ public abstract class Trade implements Tradable, Model {
}); });
}); });
// check if done
if (isPayoutUnlocked()) {
maybeClearProcessData();
return;
}
// reset buyer's payment sent state if no ack receive // reset buyer's payment sent state if no ack receive
if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
@ -623,6 +623,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade state events // handle trade state events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) { if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod(); updateWalletRefreshPeriod();
@ -633,6 +634,7 @@ public abstract class Trade implements Tradable, Model {
// handle trade phase events // handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) { if (isPaymentReceived()) {
@ -648,6 +650,7 @@ public abstract class Trade implements Tradable, Model {
// handle payout events // handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod(); if (isPayoutPublished()) updateWalletRefreshPeriod();
@ -679,21 +682,7 @@ public abstract class Trade implements Tradable, Model {
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return; if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
ThreadUtils.execute(() -> { clearAndShutDown();
deleteWallet();
maybeClearProcessData();
if (idlePayoutSyncer != null) {
xmrWalletService.removeWalletListener(idlePayoutSyncer);
idlePayoutSyncer = null;
}
UserThread.execute(() -> {
if (payoutStateSubscription != null) {
payoutStateSubscription.unsubscribe();
payoutStateSubscription = null;
}
});
}, getId());
} }
}, getId()); }, getId());
}); });
@ -912,7 +901,7 @@ public abstract class Trade implements Tradable, Model {
throw new RuntimeException("Refusing to delete wallet for " + getClass().getSimpleName() + " " + getId() + " because it has a balance"); throw new RuntimeException("Refusing to delete wallet for " + getClass().getSimpleName() + " " + getId() + " because it has a balance");
} }
// force stop the wallet // force stop wallet
if (wallet != null) stopWallet(); if (wallet != null) stopWallet();
// delete wallet // delete wallet
@ -1195,21 +1184,24 @@ public abstract class Trade implements Tradable, Model {
return payoutAmountFromMediation < normalPayoutAmount; return payoutAmountFromMediation < normalPayoutAmount;
} }
public void maybeClearProcessData() { public void clearAndShutDown() {
ThreadUtils.execute(() -> clearProcessData(), getId());
ThreadUtils.submitToPool(() -> shutDown()); // run off trade thread
}
private void clearProcessData() {
// delete trade wallet
synchronized (walletLock) { synchronized (walletLock) {
if (!walletExists()) return; // done if already cleared
// skip if already cleared
if (!walletExists()) return;
// delete trade wallet
deleteWallet(); deleteWallet();
}
// TODO: clear other process data // TODO: clear other process data
setPayoutTxHex(null); setPayoutTxHex(null);
for (TradePeer peer : getPeers()) { for (TradePeer peer : getPeers()) {
peer.setUnsignedPayoutTxHex(null); peer.setUnsignedPayoutTxHex(null);
peer.setUpdatedMultisigHex(null); peer.setUpdatedMultisigHex(null);
}
} }
} }
@ -1230,6 +1222,8 @@ public abstract class Trade implements Tradable, Model {
} }
public void shutDown() { public void shutDown() {
if (isShutDown) return; // ignore if already shut down
isShutDownStarted = true;
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
// shut down thread pools with timeout // shut down thread pools with timeout

View file

@ -1262,10 +1262,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade from list // remove trade from list
removeTrade(trade); removeTrade(trade);
// delete trade wallet
if (trade.walletExists()) trade.deleteWallet();
} }
// clear and shut down trade
trade.clearAndShutDown();
} }
private void listenForCleanup(Trade trade) { private void listenForCleanup(Trade trade) {

View file

@ -17,6 +17,7 @@
package haveno.core.trade.protocol; package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.ErrorMessageHandler;
import haveno.common.handlers.ResultHandler; import haveno.common.handlers.ResultHandler;
import haveno.core.trade.BuyerTrade; import haveno.core.trade.BuyerTrade;
@ -97,7 +98,7 @@ public class BuyerProtocol extends DisputeProtocol {
public void onPaymentSent(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onPaymentSent(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerProtocol.onPaymentSent()"); System.out.println("BuyerProtocol.onPaymentSent()");
new Thread(() -> { ThreadUtils.execute(() -> {
synchronized (trade) { synchronized (trade) {
latchTrade(); latchTrade();
this.errorMessageHandler = errorMessageHandler; this.errorMessageHandler = errorMessageHandler;
@ -127,7 +128,7 @@ public class BuyerProtocol extends DisputeProtocol {
} }
awaitTradeLatch(); awaitTradeLatch();
} }
}).start(); }, trade.getId());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View file

@ -17,6 +17,7 @@
package haveno.core.trade.protocol; package haveno.core.trade.protocol;
import haveno.common.ThreadUtils;
import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.ErrorMessageHandler;
import haveno.common.handlers.ResultHandler; import haveno.common.handlers.ResultHandler;
import haveno.core.trade.SellerTrade; import haveno.core.trade.SellerTrade;
@ -94,7 +95,7 @@ public class SellerProtocol extends DisputeProtocol {
public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
log.info("SellerProtocol.onPaymentReceived()"); log.info("SellerProtocol.onPaymentReceived()");
new Thread(() -> { ThreadUtils.execute(() -> {
synchronized (trade) { synchronized (trade) {
latchTrade(); latchTrade();
this.errorMessageHandler = errorMessageHandler; this.errorMessageHandler = errorMessageHandler;
@ -123,7 +124,7 @@ public class SellerProtocol extends DisputeProtocol {
} }
awaitTradeLatch(); awaitTradeLatch();
} }
}).start(); }, trade.getId());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View file

@ -265,6 +265,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
} }
public void maybeSendDepositsConfirmedMessages() { public void maybeSendDepositsConfirmedMessages() {
if (!trade.isInitialized() || trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return; if (!trade.isDepositsConfirmed() || trade.isDepositsConfirmedAcked() || trade.isPayoutPublished()) return;
synchronized (trade) { synchronized (trade) {
@ -286,6 +287,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
} }
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) { public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
if (trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
synchronized (trade) { synchronized (trade) {

View file

@ -276,13 +276,13 @@ public class BroadcastHandler implements PeerManager.Listener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();
if (stopped.get()) { if (stopped.get()) {
return; return;
} }
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();
maybeNotifyListeners(broadcastRequestsForConnection); maybeNotifyListeners(broadcastRequestsForConnection);
checkForCompletion(); checkForCompletion();
} }