From 3cde880b1c508adffdd0cfc5df019ce12df0ff93 Mon Sep 17 00:00:00 2001 From: woodser <13068859+woodser@users.noreply.github.com> Date: Fri, 14 Mar 2025 08:55:47 -0400 Subject: [PATCH] refactor message resending, reprocessing, and ack handling --- .../main/java/haveno/core/trade/Trade.java | 77 +++++-------- .../core/trade/protocol/ProcessModel.java | 84 ++++++-------- .../core/trade/protocol/SellerProtocol.java | 49 ++++---- .../haveno/core/trade/protocol/TradePeer.java | 100 +++++++++++++++- .../core/trade/protocol/TradeProtocol.java | 108 ++++++++++++------ .../tasks/BuyerSendPaymentSentMessage.java | 23 ++-- ...yerSendPaymentSentMessageToArbitrator.java | 21 +--- .../BuyerSendPaymentSentMessageToSeller.java | 14 +-- .../SellerSendPaymentReceivedMessage.java | 85 +++++++++++++- ...llerSendPaymentReceivedMessageToBuyer.java | 24 ++++ .../tasks/SendDepositsConfirmedMessage.java | 27 +++-- ...dDepositsConfirmedMessageToArbitrator.java | 12 +- .../SendDepositsConfirmedMessageToBuyer.java | 12 +- .../SendDepositsConfirmedMessageToSeller.java | 12 +- .../pendingtrades/PendingTradesViewModel.java | 3 +- .../steps/seller/SellerStep3View.java | 1 + proto/src/main/proto/pb.proto | 10 +- 17 files changed, 419 insertions(+), 243 deletions(-) diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 13594cc3a1..72d77de7d6 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -46,7 +46,6 @@ import haveno.common.taskrunner.Model; import haveno.common.util.Utilities; import haveno.core.monetary.Price; import haveno.core.monetary.Volume; -import haveno.core.network.MessageState; import haveno.core.offer.Offer; import haveno.core.offer.OfferDirection; import haveno.core.offer.OpenOffer; @@ -195,7 +194,8 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), - SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED); + SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), + BUYER_RECEIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED); @NotNull public Phase getPhase() { @@ -603,12 +603,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { } } - // notified from TradeProtocol of ack messages - public void onAckMessage(AckMessage ackMessage, NodeAddress sender) { + // notified from TradeProtocol of ack messages + public void onAckMessage(AckMessage ackMessage, NodeAddress sender) { for (TradeListener listener : new ArrayList<TradeListener>(tradeListeners)) { // copy array to allow listener invocation to unregister listener without concurrent modification exception listener.onAckMessage(ackMessage, sender); } - } + } /////////////////////////////////////////////////////////////////////////////////////////// @@ -618,8 +618,9 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { public void initialize(ProcessModelServiceProvider serviceProvider) { if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); - // done if payout unlocked and marked complete - if (isPayoutUnlocked() && isCompleted()) { + // skip initialization if trade is complete + // starting in v1.0.19, seller resends payment received message until acked or stored in mailbox + if (isPayoutUnlocked() && isCompleted() && !getProtocol().needsToResendPaymentReceivedMessages()) { clearAndShutDown(); return; } @@ -733,15 +734,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { xmrWalletService.addWalletListener(idlePayoutSyncer); } - // TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed? - if (isBuyer()) { - MessageState expectedState = getPaymentSentMessageState(); - if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) { - log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get()); - processModel.getPaymentSentMessageStatePropertySeller().set(expectedState); - } - } - // handle confirmations walletHeight.addListener((observable, oldValue, newValue) -> { importMultisigHexIfScheduled(); @@ -771,11 +763,20 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { } } - // start polling if deposit requested - if (isDepositRequested()) tryInitPolling(); + // init syncing if deposit requested + if (isDepositRequested()) { + tryInitSyncing(); + } isFullyInitialized = true; } + public void reprocessApplicableMessages() { + if (!isDepositRequested() || isPayoutUnlocked() || isCompleted()) return; + getProtocol().maybeReprocessPaymentSentMessage(false); + getProtocol().maybeReprocessPaymentReceivedMessage(false); + HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false); + } + public void awaitInitialized() { while (!isFullyInitialized) HavenoUtils.waitFor(100); // TODO: use proper notification and refactor isInitialized, fullyInitialized, and arbitrator idling } @@ -1535,7 +1536,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { peer.setUpdatedMultisigHex(null); peer.setDisputeClosedMessage(null); peer.setPaymentSentMessage(null); - peer.setPaymentReceivedMessage(null); + if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null); } } @@ -2049,25 +2050,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { throw new IllegalArgumentException("Trade is not buyer, seller, or arbitrator"); } - public MessageState getPaymentSentMessageState() { - if (isPaymentReceived()) return MessageState.ACKNOWLEDGED; - if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED; - switch (state) { - case BUYER_SENT_PAYMENT_SENT_MSG: - return MessageState.SENT; - case BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG: - return MessageState.ARRIVED; - case BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG: - return MessageState.STORED_IN_MAILBOX; - case SELLER_RECEIVED_PAYMENT_SENT_MSG: - return MessageState.ACKNOWLEDGED; - case BUYER_SEND_FAILED_PAYMENT_SENT_MSG: - return MessageState.FAILED; - default: - return null; - } - } - public String getPeerRole(TradePeer peer) { if (peer == getBuyer()) return "Buyer"; if (peer == getSeller()) return "Seller"; @@ -2444,11 +2426,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { // sync and reprocess messages on new thread if (isInitialized && connection != null && !Boolean.FALSE.equals(xmrConnectionService.isConnected())) { - ThreadUtils.execute(() -> tryInitPolling(), getId()); + ThreadUtils.execute(() -> tryInitSyncing(), getId()); } } } - private void tryInitPolling() { + + private void tryInitSyncing() { if (isShutDownStarted) return; // set known deposit txs @@ -2457,24 +2440,18 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { // start polling if (!isIdling()) { - tryInitPollingAux(); + doTryInitSyncing(); } else { long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling UserThread.runAfter(() -> ThreadUtils.execute(() -> { - if (!isShutDownStarted) tryInitPollingAux(); + if (!isShutDownStarted) doTryInitSyncing(); }, getId()), startSyncingInMs / 1000l); } } - private void tryInitPollingAux() { + private void doTryInitSyncing() { if (!wasWalletSynced) trySyncWallet(true); updatePollPeriod(); - - // reprocess pending messages - getProtocol().maybeReprocessPaymentSentMessage(false); - getProtocol().maybeReprocessPaymentReceivedMessage(false); - HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false); - startPolling(); } @@ -2825,7 +2802,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { if (!isShutDownStarted) wallet = getWallet(); restartInProgress = false; pollWallet(); - if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId()); + if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitSyncing(), getId()); } private void setStateDepositsSeen() { diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java index 8521174ca8..209e28afd5 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java @@ -44,6 +44,7 @@ import haveno.core.account.witness.AccountAgeWitnessService; import haveno.core.filter.FilterManager; import haveno.core.network.MessageState; import haveno.core.offer.Offer; +import haveno.core.offer.OfferDirection; import haveno.core.offer.OpenOfferManager; import haveno.core.payment.PaymentAccount; import haveno.core.payment.payload.PaymentAccountPayload; @@ -73,6 +74,9 @@ import org.bitcoinj.core.Coin; import org.bitcoinj.core.Transaction; import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.List; import java.util.Optional; // Fields marked as transient are only used during protocol execution which are based on directMessages so we do not @@ -161,15 +165,11 @@ public class ProcessModel implements Model, PersistablePayload { @Getter @Setter private boolean importMultisigHexScheduled; - - // We want to indicate the user the state of the message delivery of the - // PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet. - // To enable that even after restart we persist the state. - @Setter - private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED); - @Setter - private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED); private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false); + @Deprecated + private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED); + @Deprecated + private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED); public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) { this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer()); @@ -191,6 +191,31 @@ public class ProcessModel implements Model, PersistablePayload { this.offer = offer; this.provider = provider; this.tradeManager = tradeManager; + for (TradePeer peer : getTradePeers()) { + peer.applyTransient(tradeManager); + } + + // migrate deprecated fields to new model for v1.0.19 + if (paymentSentMessageStatePropertySeller.get() != MessageState.UNDEFINED && getSeller().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) { + getSeller().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertySeller.get()); + tradeManager.requestPersistence(); + } + if (paymentSentMessageStatePropertyArbitrator.get() != MessageState.UNDEFINED && getArbitrator().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) { + getArbitrator().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertyArbitrator.get()); + tradeManager.requestPersistence(); + } + } + + private List<TradePeer> getTradePeers() { + return Arrays.asList(maker, taker, arbitrator); + } + + private TradePeer getBuyer() { + return offer.getDirection() == OfferDirection.BUY ? maker : taker; + } + + private TradePeer getSeller() { + return offer.getDirection() == OfferDirection.BUY ? taker : maker; } @@ -245,14 +270,13 @@ public class ProcessModel implements Model, PersistablePayload { processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress())); processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress())); + // deprecated fields need to be read in order to migrate to new fields String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller()); MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString); - processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller); - + processModel.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateSeller); String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator()); MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString); - processModel.setPaymentSentMessageStateArbitrator(paymentSentMessageStateArbitrator); - + processModel.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateArbitrator); return processModel; } @@ -279,40 +303,8 @@ public class ProcessModel implements Model, PersistablePayload { return getP2PService().getAddress(); } - void setPaymentSentAckMessageSeller(AckMessage ackMessage) { - MessageState messageState = ackMessage.isSuccess() ? - MessageState.ACKNOWLEDGED : - MessageState.FAILED; - setPaymentSentMessageStateSeller(messageState); - } - - void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) { - MessageState messageState = ackMessage.isSuccess() ? - MessageState.ACKNOWLEDGED : - MessageState.FAILED; - setPaymentSentMessageStateArbitrator(messageState); - } - - public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) { - this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty); - if (tradeManager != null) { - tradeManager.requestPersistence(); - } - } - - public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessageStateProperty) { - this.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateProperty); - if (tradeManager != null) { - tradeManager.requestPersistence(); - } - } - - public boolean isPaymentSentMessageAckedBySeller() { - return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED; - } - - public boolean isPaymentSentMessageAckedByArbitrator() { - return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED; + public boolean isPaymentReceivedMessagesReceived() { + return getArbitrator().isPaymentReceivedMessageReceived() && getBuyer().isPaymentReceivedMessageReceived(); } void setDepositTxSentAckMessage(AckMessage ackMessage) { diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java index 2b7f45877a..4daa800229 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java @@ -53,6 +53,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class SellerProtocol extends DisputeProtocol { + + private static final long RESEND_PAYMENT_RECEIVED_MSGS_AFTER = 1741629525730L; // Mar 10 2025 17:58 UTC + enum SellerEvent implements FluentProtocol.Event { STARTUP, DEPOSIT_TXS_CONFIRMED, @@ -69,31 +72,37 @@ public class SellerProtocol extends DisputeProtocol { // re-send payment received message if payout not published ThreadUtils.execute(() -> { - if (trade.isShutDownStarted() || trade.isPayoutPublished()) return; + if (!needsToResendPaymentReceivedMessages()) return; synchronized (trade.getLock()) { - if (trade.isShutDownStarted() || trade.isPayoutPublished()) return; - if (trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.isPayoutPublished()) { - latchTrade(); - given(anyPhase(Trade.Phase.PAYMENT_RECEIVED) - .with(SellerEvent.STARTUP)) - .setup(tasks( - SellerSendPaymentReceivedMessageToBuyer.class, - SellerSendPaymentReceivedMessageToArbitrator.class) - .using(new TradeTaskRunner(trade, - () -> { - unlatchTrade(); - }, - (errorMessage) -> { - log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage); - unlatchTrade(); - }))) - .executeTasks(); - awaitTradeLatch(); - } + if (!needsToResendPaymentReceivedMessages()) return; + latchTrade(); + given(anyPhase(Trade.Phase.PAYMENT_RECEIVED) + .with(SellerEvent.STARTUP)) + .setup(tasks( + SellerSendPaymentReceivedMessageToBuyer.class, + SellerSendPaymentReceivedMessageToArbitrator.class) + .using(new TradeTaskRunner(trade, + () -> { + unlatchTrade(); + }, + (errorMessage) -> { + log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage); + unlatchTrade(); + }))) + .executeTasks(); + awaitTradeLatch(); } }, trade.getId()); } + public boolean needsToResendPaymentReceivedMessages() { + return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled(); + } + + private boolean resendPaymentReceivedMessagesEnabled() { + return trade.getTakeOfferDate().getTime() > RESEND_PAYMENT_RECEIVED_MSGS_AFTER; + } + @Override protected void onTradeMessage(TradeMessage message, NodeAddress peer) { super.onTradeMessage(message, peer); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java index eeef2d4daf..11c035a329 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java @@ -24,12 +24,17 @@ import haveno.common.crypto.PubKeyRing; import haveno.common.proto.ProtoUtil; import haveno.common.proto.persistable.PersistablePayload; import haveno.core.account.witness.AccountAgeWitness; +import haveno.core.network.MessageState; import haveno.core.payment.payload.PaymentAccountPayload; import haveno.core.proto.CoreProtoResolver; import haveno.core.support.dispute.messages.DisputeClosedMessage; +import haveno.core.trade.TradeManager; import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.PaymentSentMessage; +import haveno.network.p2p.AckMessage; import haveno.network.p2p.NodeAddress; +import javafx.beans.property.ObjectProperty; +import javafx.beans.property.SimpleObjectProperty; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -57,6 +62,7 @@ public final class TradePeer implements PersistablePayload { @Nullable transient private byte[] preparedDepositTx; transient private MoneroTxWallet depositTx; + transient private TradeManager tradeManager; // Persistable mutable @Nullable @@ -96,7 +102,6 @@ public final class TradePeer implements PersistablePayload { @Getter private DisputeClosedMessage disputeClosedMessage; - // added in v 0.6 @Nullable private byte[] accountAgeWitnessNonce; @@ -142,13 +147,32 @@ public final class TradePeer implements PersistablePayload { private long payoutAmount; @Nullable private String updatedMultisigHex; - @Getter + @Deprecated + private boolean depositsConfirmedMessageAcked; + + // We want to indicate the user the state of the message delivery of the payment + // confirmation messages. We do an automatic re-send in case it was not ACKed yet. + // To enable that even after restart we persist the state. @Setter - boolean depositsConfirmedMessageAcked; + private ObjectProperty<MessageState> depositsConfirmedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + @Setter + private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + @Setter + private ObjectProperty<MessageState> paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); public TradePeer() { } + public void applyTransient(TradeManager tradeManager) { + this.tradeManager = tradeManager; + + // migrate deprecated fields to new model for v1.0.19 + if (depositsConfirmedMessageAcked && depositsConfirmedMessageStateProperty.get() == MessageState.UNDEFINED) { + depositsConfirmedMessageStateProperty.set(MessageState.ACKNOWLEDGED); + tradeManager.requestPersistence(); + } + } + public BigInteger getDepositTxFee() { return BigInteger.valueOf(depositTxFee); } @@ -181,6 +205,60 @@ public final class TradePeer implements PersistablePayload { this.payoutAmount = payoutAmount.longValueExact(); } + void setDepositsConfirmedAckMessage(AckMessage ackMessage) { + MessageState messageState = ackMessage.isSuccess() ? + MessageState.ACKNOWLEDGED : + MessageState.FAILED; + setDepositsConfirmedMessageState(messageState); + } + + void setPaymentSentAckMessage(AckMessage ackMessage) { + MessageState messageState = ackMessage.isSuccess() ? + MessageState.ACKNOWLEDGED : + MessageState.FAILED; + setPaymentSentMessageState(messageState); + } + + void setPaymentReceivedAckMessage(AckMessage ackMessage) { + MessageState messageState = ackMessage.isSuccess() ? + MessageState.ACKNOWLEDGED : + MessageState.FAILED; + setPaymentReceivedMessageState(messageState); + } + + public void setDepositsConfirmedMessageState(MessageState depositsConfirmedMessageStateProperty) { + this.depositsConfirmedMessageStateProperty.set(depositsConfirmedMessageStateProperty); + if (tradeManager != null) { + tradeManager.requestPersistence(); + } + } + + public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) { + this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty); + if (tradeManager != null) { + tradeManager.requestPersistence(); + } + } + + public void setPaymentReceivedMessageState(MessageState paymentReceivedMessageStateProperty) { + this.paymentReceivedMessageStateProperty.set(paymentReceivedMessageStateProperty); + if (tradeManager != null) { + tradeManager.requestPersistence(); + } + } + + public boolean isDepositsConfirmedMessageAcked() { + return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED; + } + + public boolean isPaymentSentMessageAcked() { + return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED; + } + + public boolean isPaymentReceivedMessageReceived() { + return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; + } + @Override public Message toProtoMessage() { final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder(); @@ -221,6 +299,9 @@ public final class TradePeer implements PersistablePayload { Optional.ofNullable(payoutTxFee).ifPresent(e -> builder.setPayoutTxFee(payoutTxFee)); Optional.ofNullable(payoutAmount).ifPresent(e -> builder.setPayoutAmount(payoutAmount)); builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked); + builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name()); + builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name()); + builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name()); builder.setCurrentDate(currentDate); return builder.build(); @@ -270,6 +351,19 @@ public final class TradePeer implements PersistablePayload { tradePeer.setUnsignedPayoutTxHex(ProtoUtil.stringOrNullFromProto(proto.getUnsignedPayoutTxHex())); tradePeer.setPayoutTxFee(BigInteger.valueOf(proto.getPayoutTxFee())); tradePeer.setPayoutAmount(BigInteger.valueOf(proto.getPayoutAmount())); + + String depositsConfirmedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDepositsConfirmedMessageState()); + MessageState depositsConfirmedMessageState = ProtoUtil.enumFromProto(MessageState.class, depositsConfirmedMessageStateString); + tradePeer.setDepositsConfirmedMessageState(depositsConfirmedMessageState); + + String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState()); + MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString); + tradePeer.setPaymentSentMessageState(paymentSentMessageState); + + String paymentReceivedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageState()); + MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString); + tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState); + return tradePeer; } } diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 08c99570f6..512db73983 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -252,6 +252,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); + + // reprocess applicable messages + trade.reprocessApplicableMessages(); } // send deposits confirmed message if applicable @@ -281,6 +284,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D }, trade.getId()); } + public boolean needsToResendPaymentReceivedMessages() { + return false; // seller protocol overrides + } + public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) { if (trade.isShutDownStarted()) return; ThreadUtils.execute(() -> { @@ -291,7 +298,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D return; } - log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + log.warn("Reprocessing PaymentSentMessage for {} {}", trade.getClass().getSimpleName(), trade.getId()); handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError); } }, trade.getId()); @@ -307,7 +314,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D return; } - log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); + log.warn("Reprocessing PaymentReceivedMessage for {} {}", trade.getClass().getSimpleName(), trade.getId()); handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); } }, trade.getId()); @@ -710,47 +717,76 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D private void onAckMessage(AckMessage ackMessage, NodeAddress sender) { - // handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time - if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) { - if (trade.getTradePeer(sender) == trade.getSeller()) { - processModel.setPaymentSentAckMessageSeller(ackMessage); - trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG); - processModel.getTradeManager().requestPersistence(); - } else if (trade.getTradePeer(sender) == trade.getArbitrator()) { - processModel.setPaymentSentAckMessageArbitrator(ackMessage); - processModel.getTradeManager().requestPersistence(); - } else if (!ackMessage.isSuccess()) { - String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage(); - log.warn(err); - return; // log error and ignore nack if not seller - } + // get trade peer + TradePeer peer = trade.getTradePeer(sender); + if (peer == null) { + if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator(); + else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker(); + else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker(); + } + if (peer == null) { + if (ackMessage.isSuccess()) log.warn("Received AckMessage from unknown peer for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid()); + else log.warn("Received AckMessage with error state from unknown peer for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage()); + return; } - if (ackMessage.isSuccess()) { - log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid()); + // update sender's node address + if (!peer.getNodeAddress().equals(sender)) { + log.info("Updating peer's node address from {} to {} using ACK message to {}", peer.getNodeAddress(), sender, ackMessage.getSourceMsgClassName()); + peer.setNodeAddress(sender); + } - // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time - if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) { - TradePeer peer = trade.getTradePeer(sender); - if (peer == null) { - - // get the applicable peer based on the sourceUid - if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator(); - else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker(); - else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker(); - } - if (peer == null) log.warn("Received AckMesage for DepositsConfirmedMessage for unknown peer: " + sender); - else peer.setDepositsConfirmedMessageAcked(true); - } - } else { - log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage()); - - // set trade state on deposit request nack - if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) { + // set trade state on deposit request nack + if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) { + if (!ackMessage.isSuccess()) { trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); processModel.getTradeManager().requestPersistence(); } + } + // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time + if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) { + peer.setDepositsConfirmedAckMessage(ackMessage); + processModel.getTradeManager().requestPersistence(); + } + + // handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time + if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) { + if (trade.getTradePeer(sender) == trade.getSeller()) { + trade.getSeller().setPaymentSentAckMessage(ackMessage); + if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG); + else trade.setState(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG); + processModel.getTradeManager().requestPersistence(); + } else if (trade.getTradePeer(sender) == trade.getArbitrator()) { + trade.getArbitrator().setPaymentSentAckMessage(ackMessage); + processModel.getTradeManager().requestPersistence(); + } else { + log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage()); + return; + } + } + + // handle ack for PaymentReceivedMessage, which automatically re-sends if not ACKed in a certain time + if (ackMessage.getSourceMsgClassName().equals(PaymentReceivedMessage.class.getSimpleName())) { + if (trade.getTradePeer(sender) == trade.getBuyer()) { + trade.getBuyer().setPaymentReceivedAckMessage(ackMessage); + if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG); + else trade.setState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG); + processModel.getTradeManager().requestPersistence(); + } else if (trade.getTradePeer(sender) == trade.getArbitrator()) { + trade.getArbitrator().setPaymentReceivedAckMessage(ackMessage); + processModel.getTradeManager().requestPersistence(); + } else { + log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage()); + return; + } + } + + // generic handling + if (ackMessage.isSuccess()) { + log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid()); + } else { + log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage()); handleError(ackMessage.getErrorMessage()); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java index bc064399a3..86bb957577 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java @@ -142,26 +142,26 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask @Override protected void setStateSent() { - if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + getReceiver().setPaymentSentMessageState(MessageState.SENT); tryToSendAgainLater(); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateArrived() { - trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG); + getReceiver().setPaymentSentMessageState(MessageState.ARRIVED); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateStoredInMailbox() { - trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG); + getReceiver().setPaymentSentMessageState(MessageState.STORED_IN_MAILBOX); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateFault() { - trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG); + getReceiver().setPaymentSentMessageState(MessageState.FAILED); processModel.getTradeManager().requestPersistence(); } @@ -170,7 +170,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask timer.stop(); } if (listener != null) { - processModel.getPaymentSentMessageStatePropertySeller().removeListener(listener); + trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener); } } @@ -185,7 +185,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask return; } - log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); if (timer != null) { timer.stop(); } @@ -194,8 +193,8 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask if (resendCounter == 0) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); - processModel.getPaymentSentMessageStatePropertySeller().addListener(listener); - onMessageStateChange(processModel.getPaymentSentMessageStatePropertySeller().get()); + getReceiver().getPaymentSentMessageStateProperty().addListener(listener); + onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get()); } // first re-send is after 2 minutes, then increase the delay exponentially @@ -212,12 +211,12 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask } private void onMessageStateChange(MessageState newValue) { - if (newValue == MessageState.ACKNOWLEDGED) { - trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG); - processModel.getTradeManager().requestPersistence(); + if (isAckedByReceiver()) { cleanup(); } } - protected abstract boolean isAckedByReceiver(); + protected boolean isAckedByReceiver() { + return getReceiver().isPaymentSentMessageAcked(); + } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java index cd3098737a..9fea701200 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java @@ -38,26 +38,7 @@ public class BuyerSendPaymentSentMessageToArbitrator extends BuyerSendPaymentSen @Override protected void setStateSent() { + super.setStateSent(); complete(); // don't wait for message to arbitrator } - - @Override - protected void setStateFault() { - // state only updated on seller message - } - - @Override - protected void setStateStoredInMailbox() { - // state only updated on seller message - } - - @Override - protected void setStateArrived() { - // state only updated on seller message - } - - @Override - protected boolean isAckedByReceiver() { - return trade.getProcessModel().isPaymentSentMessageAckedByArbitrator(); - } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java index 825220d5b4..57ca170455 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java @@ -18,7 +18,6 @@ package haveno.core.trade.protocol.tasks; import haveno.common.taskrunner.TaskRunner; -import haveno.core.network.MessageState; import haveno.core.trade.Trade; import haveno.core.trade.messages.TradeMessage; import haveno.core.trade.protocol.TradePeer; @@ -40,25 +39,25 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes @Override protected void setStateSent() { - trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.SENT); + if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); super.setStateSent(); } @Override protected void setStateArrived() { - trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.ARRIVED); + trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG); super.setStateArrived(); } @Override protected void setStateStoredInMailbox() { - trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.STORED_IN_MAILBOX); + trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG); super.setStateStoredInMailbox(); } @Override protected void setStateFault() { - trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.FAILED); + trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG); super.setStateFault(); } @@ -69,9 +68,4 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage); complete(); } - - @Override - protected boolean isAckedByReceiver() { - return trade.getProcessModel().isPaymentSentMessageAckedBySeller(); - } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java index f08fe87946..202d4c8c79 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java @@ -35,11 +35,15 @@ package haveno.core.trade.protocol.tasks; import com.google.common.base.Charsets; + +import haveno.common.Timer; +import haveno.common.UserThread; import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.Sig; import haveno.common.taskrunner.TaskRunner; import haveno.core.account.sign.SignedWitness; import haveno.core.account.witness.AccountAgeWitnessService; +import haveno.core.network.MessageState; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.PaymentReceivedMessage; @@ -47,15 +51,23 @@ import haveno.core.trade.messages.TradeMailboxMessage; import haveno.core.trade.protocol.TradePeer; import haveno.core.util.JsonUtil; import haveno.network.p2p.NodeAddress; +import javafx.beans.value.ChangeListener; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import static com.google.common.base.Preconditions.checkArgument; +import java.util.concurrent.TimeUnit; + @Slf4j @EqualsAndHashCode(callSuper = true) public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask { - SignedWitness signedWitness = null; + private SignedWitness signedWitness = null; + private ChangeListener<MessageState> listener; + private Timer timer; + private static final int MAX_RESEND_ATTEMPTS = 20; + private int delayInMin = 10; + private int resendCounter = 0; public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) { super(taskHandler, trade); @@ -77,6 +89,13 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag protected void run() { try { runInterceptHook(); + + // skip if already received + if (isReceived()) { + if (!isCompleted()) complete(); + return; + } + super.run(); } catch (Throwable t) { failed(t); @@ -134,29 +153,85 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag @Override protected void setStateSent() { - trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG); log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); + getReceiver().setPaymentReceivedMessageState(MessageState.SENT); + tryToSendAgainLater(); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateFault() { - trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG); log.error("{} failed: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); + getReceiver().setPaymentReceivedMessageState(MessageState.FAILED); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateStoredInMailbox() { - trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG); log.info("{} stored in mailbox: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); + getReceiver().setPaymentReceivedMessageState(MessageState.STORED_IN_MAILBOX); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateArrived() { - trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG); log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); + getReceiver().setPaymentReceivedMessageState(MessageState.ARRIVED); processModel.getTradeManager().requestPersistence(); } + + private void cleanup() { + if (timer != null) { + timer.stop(); + } + if (listener != null) { + trade.getBuyer().getPaymentReceivedMessageStateProperty().removeListener(listener); + } + } + + private void tryToSendAgainLater() { + + // skip if already received + if (isReceived()) return; + + if (resendCounter >= MAX_RESEND_ATTEMPTS) { + cleanup(); + log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message."); + return; + } + + if (timer != null) { + timer.stop(); + } + + timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + + if (resendCounter == 0) { + listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); + getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener); + onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get()); + } + + // first re-send is after 2 minutes, then increase the delay exponentially + if (resendCounter == 0) { + int shortDelay = 2; + log.info("We will send the message again to the peer after a delay of {} min.", shortDelay); + timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); + } else { + log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); + timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + delayInMin = (int) ((double) delayInMin * 1.5); + } + resendCounter++; + } + + private void onMessageStateChange(MessageState newValue) { + if (isReceived()) { + cleanup(); + } + } + + protected boolean isReceived() { + return getReceiver().isPaymentReceivedMessageReceived(); + } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java index 7228b40307..212dcb22f4 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java @@ -37,6 +37,30 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe return trade.getBuyer(); } + @Override + protected void setStateSent() { + trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG); + super.setStateSent(); + } + + @Override + protected void setStateFault() { + trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG); + super.setStateFault(); + } + + @Override + protected void setStateStoredInMailbox() { + trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG); + super.setStateStoredInMailbox(); + } + + @Override + protected void setStateArrived() { + trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG); + super.setStateArrived(); + } + // continue execution on fault so payment received message is sent to arbitrator @Override protected void onFault(String errorMessage, TradeMessage message) { diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java index fb17d60d4d..ba20d74351 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java @@ -23,6 +23,7 @@ import haveno.common.Timer; import haveno.common.UserThread; import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; +import haveno.core.network.MessageState; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.DepositsConfirmedMessage; @@ -52,8 +53,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas try { runInterceptHook(); - // skip if already acked by receiver - if (isAckedByReceiver()) { + // skip if already acked or payout published + if (isAckedByReceiver() || trade.isPayoutPublished()) { if (!isCompleted()) complete(); return; } @@ -64,11 +65,17 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas } } - @Override - protected abstract NodeAddress getReceiverNodeAddress(); + protected abstract TradePeer getReceiver(); @Override - protected abstract PubKeyRing getReceiverPubKeyRing(); + protected NodeAddress getReceiverNodeAddress() { + return getReceiver().getNodeAddress(); + } + + @Override + protected PubKeyRing getReceiverPubKeyRing() { + return getReceiver().getPubKeyRing(); + } @Override protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) { @@ -97,23 +104,24 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas @Override protected void setStateSent() { + getReceiver().setDepositsConfirmedMessageState(MessageState.SENT); tryToSendAgainLater(); processModel.getTradeManager().requestPersistence(); } @Override protected void setStateArrived() { - // no additional handling + getReceiver().setDepositsConfirmedMessageState(MessageState.ARRIVED); } @Override protected void setStateStoredInMailbox() { - // no additional handling + getReceiver().setDepositsConfirmedMessageState(MessageState.STORED_IN_MAILBOX); } @Override protected void setStateFault() { - // no additional handling + getReceiver().setDepositsConfirmedMessageState(MessageState.FAILED); } private void cleanup() { @@ -151,7 +159,6 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas } private boolean isAckedByReceiver() { - TradePeer peer = trade.getTradePeer(getReceiverNodeAddress()); - return peer.isDepositsConfirmedMessageAcked(); + return getReceiver().isDepositsConfirmedMessageAcked(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java index baaa6ae987..ae8a171aa8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java @@ -17,10 +17,9 @@ package haveno.core.trade.protocol.tasks; -import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.network.p2p.NodeAddress; +import haveno.core.trade.protocol.TradePeer; import lombok.extern.slf4j.Slf4j; /** @@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToArbitrator extends SendDepositsConfir } @Override - public NodeAddress getReceiverNodeAddress() { - return trade.getArbitrator().getNodeAddress(); - } - - @Override - public PubKeyRing getReceiverPubKeyRing() { - return trade.getArbitrator().getPubKeyRing(); + protected TradePeer getReceiver() { + return trade.getArbitrator(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java index 5795ce8947..bf1212c9a8 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java @@ -17,10 +17,9 @@ package haveno.core.trade.protocol.tasks; -import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.network.p2p.NodeAddress; +import haveno.core.trade.protocol.TradePeer; import lombok.extern.slf4j.Slf4j; /** @@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToBuyer extends SendDepositsConfirmedMe } @Override - public NodeAddress getReceiverNodeAddress() { - return trade.getBuyer().getNodeAddress(); - } - - @Override - public PubKeyRing getReceiverPubKeyRing() { - return trade.getBuyer().getPubKeyRing(); + protected TradePeer getReceiver() { + return trade.getBuyer(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java index efdf9a99cd..4ea097fd2b 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java @@ -17,10 +17,9 @@ package haveno.core.trade.protocol.tasks; -import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.network.p2p.NodeAddress; +import haveno.core.trade.protocol.TradePeer; import lombok.extern.slf4j.Slf4j; /** @@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToSeller extends SendDepositsConfirmedM } @Override - public NodeAddress getReceiverNodeAddress() { - return trade.getSeller().getNodeAddress(); - } - - @Override - public PubKeyRing getReceiverPubKeyRing() { - return trade.getSeller().getPubKeyRing(); + protected TradePeer getReceiver() { + return trade.getSeller(); } } diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java index b9936236c6..b19d6dcc26 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java @@ -200,7 +200,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> { onPayoutStateChanged(state); }); - messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStatePropertySeller(), this::onPaymentSentMessageStateChanged); + messageStateSubscription = EasyBind.subscribe(trade.getSeller().getPaymentSentMessageStateProperty(), this::onPaymentSentMessageStateChanged); } } } @@ -425,6 +425,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad case SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG: case SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG: case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG: + case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG: sellerState.set(trade.isPayoutPublished() ? SellerState.STEP4 : SellerState.STEP3); break; diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java index c5cfcbcdb5..c6fe0cab23 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java @@ -135,6 +135,7 @@ public class SellerStep3View extends TradeStepView { statusLabel.setText(Res.get("shared.messageStoredInMailbox")); break; case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG: + case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG: busyAnimation.stop(); statusLabel.setText(Res.get("shared.messageArrived")); break; diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 9bca09b668..a814f71e4a 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -1465,6 +1465,7 @@ message Trade { SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24; SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25; SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26; + BUYER_RECEIVED_PAYMENT_RECEIVED_MSG = 27; } enum Phase { @@ -1568,8 +1569,8 @@ message ProcessModel { bytes payout_tx_signature = 4; bool use_savings_wallet = 5; int64 funds_needed_for_trade = 6; - string payment_sent_message_state_seller = 7; - string payment_sent_message_state_arbitrator = 8; + string payment_sent_message_state_seller = 7 [deprecated = true]; + string payment_sent_message_state_arbitrator = 8 [deprecated = true]; bytes maker_signature = 9; TradePeer maker = 10; TradePeer taker = 11; @@ -1613,7 +1614,7 @@ message TradePeer { string made_multisig_hex = 31; string exchanged_multisig_hex = 32; string updated_multisig_hex = 33; - bool deposits_confirmed_message_acked = 34; + bool deposits_confirmed_message_acked = 34 [deprecated = true]; string deposit_tx_hash = 35; string deposit_tx_hex = 36; string deposit_tx_key = 37; @@ -1622,6 +1623,9 @@ message TradePeer { string unsigned_payout_tx_hex = 40; int64 payout_tx_fee = 41; int64 payout_amount = 42; + string deposits_confirmed_message_state = 43; + string payment_sent_message_state = 44; + string payment_received_message_state = 45; } ///////////////////////////////////////////////////////////////////////////////////////////