improve trade state reliability

update trade state atomically on UserThread
nullify error handling on deposits confirmed message
set trade state before deposit request and relay
add checks before deleting wallet
UserThread.await() detects if on UserThread
This commit is contained in:
woodser 2023-12-24 12:09:53 -05:00
parent 3de4264c4b
commit 6c2f3ea154
7 changed files with 65 additions and 28 deletions

View file

@ -45,6 +45,7 @@ public class UserThread {
@Getter @Getter
@Setter @Setter
private static Executor executor; private static Executor executor;
private static final String USER_THREAD_NAME = "UserThread";
public static void setTimerClass(Class<? extends Timer> timerClass) { public static void setTimerClass(Class<? extends Timer> timerClass) {
UserThread.timerClass = timerClass; UserThread.timerClass = timerClass;
@ -57,21 +58,30 @@ public class UserThread {
} }
public static void execute(Runnable command) { public static void execute(Runnable command) {
UserThread.executor.execute(command); UserThread.executor.execute(() -> {
Thread.currentThread().setName(USER_THREAD_NAME);
command.run();
});
} }
public static void await(Runnable command) { public static void await(Runnable command) {
CountDownLatch latch = new CountDownLatch(1); if (isUserThread(Thread.currentThread())) {
executor.execute(() -> {
command.run(); command.run();
latch.countDown(); } else {
}); CountDownLatch latch = new CountDownLatch(1);
execute(command); // run task
execute(() -> latch.countDown()); // await next tick
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
}
public static boolean isUserThread(Thread thread) {
return USER_THREAD_NAME.equals(thread.getName());
}
// Prefer FxTimer if a delay is needed in a JavaFx class (gui module) // Prefer FxTimer if a delay is needed in a JavaFx class (gui module)
public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) { public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) {

View file

@ -647,7 +647,7 @@ public final class XmrConnectionService {
if (DevEnv.isDevMode()) e.printStackTrace(); if (DevEnv.isDevMode()) e.printStackTrace();
} }
// check connection which notifies of changes new Thread(() -> {
if (connectionManager.getAutoSwitch()) connectionManager.setConnection(connectionManager.getBestAvailableConnection()); if (connectionManager.getAutoSwitch()) connectionManager.setConnection(connectionManager.getBestAvailableConnection());
else connectionManager.checkConnection(); else connectionManager.checkConnection();
@ -655,6 +655,7 @@ public final class XmrConnectionService {
if (!Boolean.TRUE.equals(connectionManager.isConnected()) && HavenoUtils.havenoSetup != null) { if (!Boolean.TRUE.equals(connectionManager.isConnected()) && HavenoUtils.havenoSetup != null) {
HavenoUtils.havenoSetup.getWalletServiceErrorMsg().set(e.getMessage()); HavenoUtils.havenoSetup.getWalletServiceErrorMsg().set(e.getMessage());
} }
}).start();
} finally { } finally {
pollInProgress = false; pollInProgress = false;
} }

View file

@ -802,6 +802,17 @@ public abstract class Trade implements Tradable, Model {
return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && syncNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && syncNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden
} }
public boolean isSyncedWithinTolerance() {
synchronized (walletLock) {
if (wallet == null) return false;
if (!xmrConnectionService.isSyncedWithinTolerance()) return false;
Long targetHeight = xmrConnectionService.getTargetHeight();
if (targetHeight == null) return false;
if (targetHeight - wallet.getHeight() <= 3) return true; // synced if within 3 blocks of target height
return false;
}
}
public void syncAndPollWallet() { public void syncAndPollWallet() {
syncWallet(true); syncWallet(true);
} }
@ -880,13 +891,25 @@ public abstract class Trade implements Tradable, Model {
if (walletExists()) { if (walletExists()) {
try { try {
// check if synced
if (!isSyncedWithinTolerance()) {
log.warn("Refusing to delete wallet for {} {} because it is not synced within tolerance", getClass().getSimpleName(), getId());
return;
}
// check if balance > 0
if (wallet.getBalance().compareTo(BigInteger.ZERO) > 0) {
log.warn("Refusing to delete wallet for {} {} because it contains a balance", getClass().getSimpleName(), getId());
return;
}
// check if funds deposited but payout not unlocked // check if funds deposited but payout not unlocked
if (isDepositsPublished() && !isPayoutUnlocked()) { if (isDepositsPublished() && !isPayoutUnlocked()) {
throw new RuntimeException("Refusing to delete wallet for " + getClass().getSimpleName() + " " + getId() + " because the deposit txs have been published but payout tx has not unlocked"); throw new RuntimeException("Refusing to delete wallet for " + getClass().getSimpleName() + " " + getId() + " because the deposit txs have been published but payout tx has not unlocked");
} }
// force stop the wallet // force stop the wallet
if (wallet != null) stopWallet(); stopWallet();
// delete wallet // delete wallet
log.info("Deleting wallet for {} {}", getClass().getSimpleName(), getId()); log.info("Deleting wallet for {} {}", getClass().getSimpleName(), getId());
@ -1279,7 +1302,7 @@ public abstract class Trade implements Tradable, Model {
} }
this.state = state; this.state = state;
UserThread.execute(() -> { UserThread.await(() -> {
stateProperty.set(state); stateProperty.set(state);
phaseProperty.set(state.getPhase()); phaseProperty.set(state.getPhase());
}); });
@ -1310,9 +1333,7 @@ public abstract class Trade implements Tradable, Model {
} }
this.payoutState = payoutState; this.payoutState = payoutState;
UserThread.execute(() -> { UserThread.await(() -> payoutStateProperty.set(payoutState));
payoutStateProperty.set(payoutState);
});
} }
public void setDisputeState(DisputeState disputeState) { public void setDisputeState(DisputeState disputeState) {

View file

@ -1225,7 +1225,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
private void removeTradeOnError(Trade trade) { private void removeTradeOnError(Trade trade) {
log.warn("TradeManager.removeTradeOnError() " + trade.getId()); log.warn("TradeManager.removeTradeOnError() tradeId={}, state={}", trade.getId(), trade.getState());
synchronized (tradableList) { synchronized (tradableList) {
// unreserve taker key images // unreserve taker key images

View file

@ -429,7 +429,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.using(new TradeTaskRunner(trade, .using(new TradeTaskRunner(trade,
() -> { () -> {
stopTimeout(); stopTimeout();
this.errorMessageHandler = null; this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED
handleTaskRunnerSuccess(sender, response); handleTaskRunnerSuccess(sender, response);
if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized
}, },
@ -446,6 +446,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)");
synchronized (trade) { synchronized (trade) {
latchTrade(); latchTrade();
this.errorMessageHandler = null;
expect(new Condition(trade) expect(new Condition(trade)
.with(response) .with(response)
.from(sender)) .from(sender))

View file

@ -112,6 +112,10 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
// TODO (woodser): add small delay so tx has head start against double spend attempts? // TODO (woodser): add small delay so tx has head start against double spend attempts?
if (processModel.getMaker().getDepositTxHex() != null && processModel.getTaker().getDepositTxHex() != null) { if (processModel.getMaker().getDepositTxHex() != null && processModel.getTaker().getDepositTxHex() != null) {
// update trade state
trade.setState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST);
processModel.getTradeManager().requestPersistence();
// relay txs // relay txs
MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true); MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true);
MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true); MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true);

View file

@ -84,6 +84,11 @@ public class ProcessSignContractResponse extends TradeTask {
trade.getSelf().getDepositTx().getKey(), trade.getSelf().getDepositTx().getKey(),
trade.getSelf().getPaymentAccountKey()); trade.getSelf().getPaymentAccountKey());
// update trade state
trade.setState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST);
processModel.getTradeManager().requestPersistence();
trade.addInitProgressStep();
// send request to arbitrator // send request to arbitrator
log.info("Sending {} to arbitrator {}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitrator().getNodeAddress(), trade.getId(), request.getUid()); log.info("Sending {} to arbitrator {}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitrator().getNodeAddress(), trade.getId(), request.getUid());
processModel.getP2PService().sendEncryptedDirectMessage(trade.getArbitrator().getNodeAddress(), trade.getArbitrator().getPubKeyRing(), request, new SendDirectMessageListener() { processModel.getP2PService().sendEncryptedDirectMessage(trade.getArbitrator().getNodeAddress(), trade.getArbitrator().getPubKeyRing(), request, new SendDirectMessageListener() {
@ -101,11 +106,6 @@ public class ProcessSignContractResponse extends TradeTask {
failed(); failed();
} }
}); });
// deposit is requested
trade.setState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST);
trade.addInitProgressStep();
processModel.getTradeManager().requestPersistence();
} else { } else {
log.info("Waiting for another contract signature to send deposit request"); log.info("Waiting for another contract signature to send deposit request");
complete(); // does not yet have needed signatures complete(); // does not yet have needed signatures