diff --git a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java index 0626294acc..0c116eb67e 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java @@ -121,7 +121,7 @@ public class ArbitratorProtocol extends DisputeProtocol { public void handleError(String errorMessage) { // set trade state to send deposit responses with nack if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) { - trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); + trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); } super.handleError(errorMessage); } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java index 34133d4eac..45ce638f80 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java @@ -44,6 +44,7 @@ import java.util.UUID; public class ArbitratorProcessDepositRequest extends TradeTask { private Throwable error; + private boolean depositResponsesSent; @SuppressWarnings({"unused"}) public ArbitratorProcessDepositRequest(TaskRunner taskHandler, Trade trade) { @@ -68,7 +69,7 @@ public class ArbitratorProcessDepositRequest extends TradeTask { } catch (Throwable t) { this.error = t; t.printStackTrace(); - trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); + trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); failed(t); } processModel.getTradeManager().requestPersistence(); @@ -137,32 +138,33 @@ public class ArbitratorProcessDepositRequest extends TradeTask { if (isTimedOut()) throw new RuntimeException("Trade protocol has timed out before relaying deposit txs for {} {}" + trade.getClass().getSimpleName() + " " + trade.getShortId()); trade.addInitProgressStep(); + boolean depositTxsRelayed = false; try { // submit txs to pool but do not relay MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true); - if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult)); MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true); + if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult)); if (!takerResult.isGood()) throw new RuntimeException("Error submitting taker deposit tx: " + JsonUtils.serialize(takerResult)); // relay txs daemon.relayTxsByHash(Arrays.asList(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash())); + depositTxsRelayed = true; // update trade state log.info("Arbitrator published deposit txs for trade " + trade.getId()); - trade.setState(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS); + trade.setStateIfValidTransitionTo(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS); } catch (Exception e) { + log.warn("Arbitrator error publishing deposit txs for trade {} {}: {}", trade.getClass().getSimpleName(), trade.getShortId(), e.getMessage()); + e.printStackTrace(); + if (!depositTxsRelayed) { - // flush txs from pool - try { - daemon.flushTxPool(processModel.getMaker().getDepositTxHash()); - } catch (Exception e2) { - e2.printStackTrace(); - } - try { - daemon.flushTxPool(processModel.getTaker().getDepositTxHash()); - } catch (Exception e2) { - e2.printStackTrace(); + // flush txs from pool + try { + daemon.flushTxPool(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash()); + } catch (Exception e2) { + e2.printStackTrace(); + } } throw e; } @@ -170,10 +172,11 @@ public class ArbitratorProcessDepositRequest extends TradeTask { // subscribe to trade state once to send responses with ack or nack trade.stateProperty().addListener((obs, oldState, newState) -> { + if (oldState == newState) return; if (newState == Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED) { - sendDepositResponses(error == null ? "Arbitrator failed to publish deposit txs within timeout for trade " + trade.getId() : error.getMessage()); - } else if (newState == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) { - sendDepositResponses(null); + sendDepositResponsesOnce(error == null ? "Arbitrator failed to publish deposit txs within timeout for trade " + trade.getId() : error.getMessage()); + } else if (newState.ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) { + sendDepositResponsesOnce(null); } }); @@ -186,7 +189,17 @@ public class ArbitratorProcessDepositRequest extends TradeTask { return !processModel.getTradeManager().hasOpenTrade(trade); } - private void sendDepositResponses(String errorMessage) { + private synchronized void sendDepositResponsesOnce(String errorMessage) { + + // skip if sent + if (depositResponsesSent) return; + depositResponsesSent = true; + + // log error + if (errorMessage != null) { + log.warn("Sending deposit responses with error={}", errorMessage); + Thread.dumpStack(); + } // create deposit response DepositResponse response = new DepositResponse( @@ -204,7 +217,7 @@ public class ArbitratorProcessDepositRequest extends TradeTask { } private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) { - log.info("Sending deposit response to trader={}; offerId={}", nodeAddress, trade.getId()); + log.info("Sending deposit response to trader={}; offerId={}, error={}", nodeAddress, trade.getId(), error); processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, response, new SendDirectMessageListener() { @Override public void onArrived() { diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java index 2486519df7..75614863ee 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessPaymentReceivedMessage.java @@ -94,8 +94,8 @@ public class ProcessPaymentReceivedMessage extends TradeTask { } trade.requestPersistence(); - // process payout tx unless already published - if (!trade.isPayoutPublished()) processPayoutTx(message); + // process payout tx if not confirmed + if (!trade.isPayoutConfirmed()) processPayoutTx(message); // close open disputes if (trade.isPayoutPublished() && trade.getDisputeState().ordinal() >= Trade.DisputeState.DISPUTE_REQUESTED.ordinal()) {