fix trade initialization error handling and run off trade thread

This commit is contained in:
woodser 2024-01-21 05:28:19 -05:00
parent ea4359d164
commit 892eaa440a

View file

@ -590,141 +590,140 @@ public abstract class Trade implements Tradable, Model {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void initialize(ProcessModelServiceProvider serviceProvider) { public void initialize(ProcessModelServiceProvider serviceProvider) {
ThreadUtils.await(() -> { if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
// check if done // done if payout unlocked
if (isPayoutUnlocked()) { if (isPayoutUnlocked()) {
clearAndShutDown(); clearAndShutDown();
return; return;
} }
// set arbitrator pub key ring once known // set arbitrator pub key ring once known
serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> {
getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing());
});
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
ThreadUtils.submitToPool(() -> { // TODO: remove this?
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
}); });
});
// handle connection change on dedicated thread // reset buyer's payment sent state if no ack receive
xmrConnectionService.addConnectionListener(connection -> { if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) {
ThreadUtils.submitToPool(() -> { // TODO: remove this? log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN);
}); }
});
// reset buyer's payment sent state if no ack receive // reset seller's payment received state if no ack receive
if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
} }
// reset seller's payment received state if no ack receive // handle trade state events
if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) { tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> {
log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); if (!isInitialized || isShutDownStarted) return;
setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); ThreadUtils.execute(() -> {
} if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod();
startPolling();
}
}, getId());
});
// handle trade state events // handle trade phase events
tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (isShutDownStarted) return; if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) { if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
updateWalletRefreshPeriod(); if (isPaymentReceived()) {
startPolling(); UserThread.execute(() -> {
} if (tradePhaseSubscription != null) {
}, getId()); tradePhaseSubscription.unsubscribe();
}); tradePhaseSubscription = null;
// handle trade phase events
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod();
if (isPaymentReceived()) {
UserThread.execute(() -> {
if (tradePhaseSubscription != null) {
tradePhaseSubscription.unsubscribe();
tradePhaseSubscription = null;
}
});
}
}, getId());
});
// handle payout events
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();
// handle when payout published
if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) {
log.info("Payout published for {} {}", getClass().getSimpleName(), getId());
// sync main wallet to update pending balance
new Thread(() -> {
GenUtils.waitFor(1000);
if (isShutDownStarted) return;
if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
}).start();
// complete disputed trade
if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) {
processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED);
if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets
} }
});
}
}, getId());
});
// auto complete arbitrator trade // handle payout events
if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this); payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod();
// reset address entries // handle when payout published
processModel.getXmrWalletService().resetAddressEntriesForTrade(getId()); if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) {
log.info("Payout published for {} {}", getClass().getSimpleName(), getId());
// sync main wallet to update pending balance
new Thread(() -> {
GenUtils.waitFor(1000);
if (isShutDownStarted) return;
if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet());
}).start();
// complete disputed trade
if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) {
processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED);
if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets
} }
// handle when payout unlocks // auto complete arbitrator trade
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this);
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
clearAndShutDown();
}
}, getId());
});
// arbitrator syncs idle wallet when payout unlock expected // reset address entries
if (this instanceof ArbitratorTrade) { processModel.getXmrWalletService().resetAddressEntriesForTrade(getId());
idlePayoutSyncer = new IdlePayoutSyncer();
xmrWalletService.addWalletListener(idlePayoutSyncer);
}
// TODO: buyer's payment sent message state property can become unsynced (after improper shut down?)
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get());
processModel.getPaymentSentMessageStateProperty().set(expectedState);
} }
}
// trade is initialized // handle when payout unlocks
isInitialized = true; if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
// done if payout unlocked or deposit not requested log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
if (!isDepositRequested() || isPayoutUnlocked()) return; clearAndShutDown();
// done if wallet does not exist
if (!walletExists()) {
MoneroTx payoutTx = getPayoutTx();
if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) {
log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState());
setPayoutStateUnlocked();
return;
} else {
throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId());
} }
} }, getId());
});
// initialize syncing and polling // arbitrator syncs idle wallet when payout unlock expected
initSyncing(); if (this instanceof ArbitratorTrade) {
}, getId()); idlePayoutSyncer = new IdlePayoutSyncer();
xmrWalletService.addWalletListener(idlePayoutSyncer);
}
// TODO: buyer's payment sent message state property can become unsynced (after improper shut down?)
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get());
processModel.getPaymentSentMessageStateProperty().set(expectedState);
}
}
// trade is initialized
isInitialized = true;
// done if deposit not requested or payout unlocked
if (!isDepositRequested() || isPayoutUnlocked()) return;
// open wallet or done if wallet does not exist
if (walletExists()) getWallet();
else {
MoneroTx payoutTx = getPayoutTx();
if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) {
log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState());
setPayoutStateUnlocked();
return;
} else {
throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId());
}
}
// initialize syncing and polling
tryInitSyncing();
} }
public void requestPersistence() { public void requestPersistence() {
@ -1940,12 +1939,12 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread // sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
ThreadUtils.execute(() -> initSyncing(), getId()); ThreadUtils.execute(() -> tryInitSyncing(), getId());
} }
} }
} }
private void initSyncing() { private void tryInitSyncing() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
if (!isIdling()) { if (!isIdling()) {
initSyncingAux(); initSyncingAux();