diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java
index 13594cc3a1..72d77de7d6 100644
--- a/core/src/main/java/haveno/core/trade/Trade.java
+++ b/core/src/main/java/haveno/core/trade/Trade.java
@@ -46,7 +46,6 @@ import haveno.common.taskrunner.Model;
 import haveno.common.util.Utilities;
 import haveno.core.monetary.Price;
 import haveno.core.monetary.Volume;
-import haveno.core.network.MessageState;
 import haveno.core.offer.Offer;
 import haveno.core.offer.OfferDirection;
 import haveno.core.offer.OpenOffer;
@@ -195,7 +194,8 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
         SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
         SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
         SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
-        SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
+        SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
+        BUYER_RECEIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
 
         @NotNull
         public Phase getPhase() {
@@ -603,12 +603,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
         }
     }
 
-      // notified from TradeProtocol of ack messages
-      public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
+    // notified from TradeProtocol of ack messages
+    public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
         for (TradeListener listener : new ArrayList<TradeListener>(tradeListeners)) {  // copy array to allow listener invocation to unregister listener without concurrent modification exception
             listener.onAckMessage(ackMessage, sender);
         }
-      }
+    }
 
 
     ///////////////////////////////////////////////////////////////////////////////////////////
@@ -618,8 +618,9 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
     public void initialize(ProcessModelServiceProvider serviceProvider) {
         if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
 
-        // done if payout unlocked and marked complete
-        if (isPayoutUnlocked() && isCompleted()) {
+        // skip initialization if trade is complete
+        // starting in v1.0.19, seller resends payment received message until acked or stored in mailbox
+        if (isPayoutUnlocked() && isCompleted() && !getProtocol().needsToResendPaymentReceivedMessages()) {
             clearAndShutDown();
             return;
         }
@@ -733,15 +734,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
             xmrWalletService.addWalletListener(idlePayoutSyncer);
         }
 
-        // TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed?
-        if (isBuyer()) {
-            MessageState expectedState = getPaymentSentMessageState();
-            if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) {
-                log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get());
-                processModel.getPaymentSentMessageStatePropertySeller().set(expectedState);
-            }
-        }
-
         // handle confirmations
         walletHeight.addListener((observable, oldValue, newValue) -> {
             importMultisigHexIfScheduled();
@@ -771,11 +763,20 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
             }
         }
 
-        // start polling if deposit requested
-        if (isDepositRequested()) tryInitPolling();
+        // init syncing if deposit requested
+        if (isDepositRequested()) {
+            tryInitSyncing();
+        }
         isFullyInitialized = true;
     }
 
+    public void reprocessApplicableMessages() {
+        if (!isDepositRequested() || isPayoutUnlocked() || isCompleted()) return;
+        getProtocol().maybeReprocessPaymentSentMessage(false);
+        getProtocol().maybeReprocessPaymentReceivedMessage(false);
+        HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
+    }
+
     public void awaitInitialized() {
         while (!isFullyInitialized) HavenoUtils.waitFor(100); // TODO: use proper notification and refactor isInitialized, fullyInitialized, and arbitrator idling
     }
@@ -1535,7 +1536,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
             peer.setUpdatedMultisigHex(null);
             peer.setDisputeClosedMessage(null);
             peer.setPaymentSentMessage(null);
-            peer.setPaymentReceivedMessage(null);
+            if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null);
         }
     }
 
@@ -2049,25 +2050,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
         throw new IllegalArgumentException("Trade is not buyer, seller, or arbitrator");
     }
 
-    public MessageState getPaymentSentMessageState() {
-        if (isPaymentReceived()) return MessageState.ACKNOWLEDGED;
-        if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED;
-        switch (state) {
-            case BUYER_SENT_PAYMENT_SENT_MSG:
-                return MessageState.SENT;
-            case BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG:
-                return MessageState.ARRIVED;
-            case BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG:
-                return MessageState.STORED_IN_MAILBOX;
-            case SELLER_RECEIVED_PAYMENT_SENT_MSG:
-                return MessageState.ACKNOWLEDGED;
-            case BUYER_SEND_FAILED_PAYMENT_SENT_MSG:
-                return MessageState.FAILED;
-            default:
-                return null;
-        }
-    }
-
     public String getPeerRole(TradePeer peer) {
         if (peer == getBuyer()) return "Buyer";
         if (peer == getSeller()) return "Seller";
@@ -2444,11 +2426,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
 
             // sync and reprocess messages on new thread
             if (isInitialized && connection != null && !Boolean.FALSE.equals(xmrConnectionService.isConnected())) {
-                ThreadUtils.execute(() -> tryInitPolling(), getId());
+                ThreadUtils.execute(() -> tryInitSyncing(), getId());
             }
         }
     }
-    private void tryInitPolling() {
+
+    private void tryInitSyncing() {
         if (isShutDownStarted) return;
 
         // set known deposit txs
@@ -2457,24 +2440,18 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
 
         // start polling
         if (!isIdling()) {
-            tryInitPollingAux();
+            doTryInitSyncing();
         }  else {
             long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling
             UserThread.runAfter(() -> ThreadUtils.execute(() -> {
-                if (!isShutDownStarted) tryInitPollingAux();
+                if (!isShutDownStarted) doTryInitSyncing();
             }, getId()), startSyncingInMs / 1000l);
         }
     }
     
-    private void tryInitPollingAux() {
+    private void doTryInitSyncing() {
         if (!wasWalletSynced) trySyncWallet(true);
         updatePollPeriod();
-        
-        // reprocess pending messages
-        getProtocol().maybeReprocessPaymentSentMessage(false);
-        getProtocol().maybeReprocessPaymentReceivedMessage(false);
-        HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
-
         startPolling();
     }
 
@@ -2825,7 +2802,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
         if (!isShutDownStarted) wallet = getWallet();
         restartInProgress = false;
         pollWallet();
-        if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId());
+        if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitSyncing(), getId());
     }
 
     private void setStateDepositsSeen() {
diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java
index 8521174ca8..209e28afd5 100644
--- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java
+++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java
@@ -44,6 +44,7 @@ import haveno.core.account.witness.AccountAgeWitnessService;
 import haveno.core.filter.FilterManager;
 import haveno.core.network.MessageState;
 import haveno.core.offer.Offer;
+import haveno.core.offer.OfferDirection;
 import haveno.core.offer.OpenOfferManager;
 import haveno.core.payment.PaymentAccount;
 import haveno.core.payment.payload.PaymentAccountPayload;
@@ -73,6 +74,9 @@ import org.bitcoinj.core.Coin;
 import org.bitcoinj.core.Transaction;
 
 import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 
 // Fields marked as transient are only used during protocol execution which are based on directMessages so we do not
@@ -161,15 +165,11 @@ public class ProcessModel implements Model, PersistablePayload {
     @Getter
     @Setter
     private boolean importMultisigHexScheduled;
-
-    // We want to indicate the user the state of the message delivery of the
-    // PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet.
-    // To enable that even after restart we persist the state.
-    @Setter
-    private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
-    @Setter
-    private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
     private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
+    @Deprecated
+    private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
+    @Deprecated
+    private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
 
     public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) {
         this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer());
@@ -191,6 +191,31 @@ public class ProcessModel implements Model, PersistablePayload {
         this.offer = offer;
         this.provider = provider;
         this.tradeManager = tradeManager;
+        for (TradePeer peer : getTradePeers()) {
+            peer.applyTransient(tradeManager);
+        }
+
+        // migrate deprecated fields to new model for v1.0.19
+        if (paymentSentMessageStatePropertySeller.get() != MessageState.UNDEFINED && getSeller().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
+            getSeller().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertySeller.get());
+            tradeManager.requestPersistence();
+        }
+        if (paymentSentMessageStatePropertyArbitrator.get() != MessageState.UNDEFINED && getArbitrator().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
+            getArbitrator().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertyArbitrator.get());
+            tradeManager.requestPersistence();
+        }
+    }
+
+    private List<TradePeer> getTradePeers() {
+        return Arrays.asList(maker, taker, arbitrator);
+    }
+
+    private TradePeer getBuyer() {
+        return offer.getDirection() == OfferDirection.BUY ? maker : taker;
+    }
+
+    private TradePeer getSeller() {
+        return offer.getDirection() == OfferDirection.BUY ? taker : maker;
     }
 
 
@@ -245,14 +270,13 @@ public class ProcessModel implements Model, PersistablePayload {
         processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress()));
         processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress()));
 
+        // deprecated fields need to be read in order to migrate to new fields
         String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller());
         MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString);
-        processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller);
-
+        processModel.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateSeller);
         String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator());
         MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString);
-        processModel.setPaymentSentMessageStateArbitrator(paymentSentMessageStateArbitrator);
-
+        processModel.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateArbitrator);
         return processModel;
     }
 
@@ -279,40 +303,8 @@ public class ProcessModel implements Model, PersistablePayload {
         return getP2PService().getAddress();
     }
 
-    void setPaymentSentAckMessageSeller(AckMessage ackMessage) {
-        MessageState messageState = ackMessage.isSuccess() ?
-                MessageState.ACKNOWLEDGED :
-                MessageState.FAILED;
-        setPaymentSentMessageStateSeller(messageState);
-    }
-
-    void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
-        MessageState messageState = ackMessage.isSuccess() ?
-                MessageState.ACKNOWLEDGED :
-                MessageState.FAILED;
-        setPaymentSentMessageStateArbitrator(messageState);
-    }
-
-    public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) {
-        this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty);
-        if (tradeManager != null) {
-            tradeManager.requestPersistence();
-        }
-    }
-
-    public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessageStateProperty) {
-        this.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateProperty);
-        if (tradeManager != null) {
-            tradeManager.requestPersistence();
-        }
-    }
-
-    public boolean isPaymentSentMessageAckedBySeller() {
-        return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED;
-    }
-
-    public boolean isPaymentSentMessageAckedByArbitrator() {
-        return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
+    public boolean isPaymentReceivedMessagesReceived() {
+        return getArbitrator().isPaymentReceivedMessageReceived() && getBuyer().isPaymentReceivedMessageReceived();
     }
 
     void setDepositTxSentAckMessage(AckMessage ackMessage) {
diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java
index 2b7f45877a..4daa800229 100644
--- a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java
@@ -53,6 +53,9 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class SellerProtocol extends DisputeProtocol {
+    
+    private static final long RESEND_PAYMENT_RECEIVED_MSGS_AFTER = 1741629525730L; // Mar 10 2025 17:58 UTC
+
     enum SellerEvent implements FluentProtocol.Event {
         STARTUP,
         DEPOSIT_TXS_CONFIRMED,
@@ -69,31 +72,37 @@ public class SellerProtocol extends DisputeProtocol {
 
         // re-send payment received message if payout not published
         ThreadUtils.execute(() -> {
-            if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
+            if (!needsToResendPaymentReceivedMessages()) return;
             synchronized (trade.getLock()) {
-                if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
-                if (trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.isPayoutPublished()) {
-                    latchTrade();
-                    given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
-                        .with(SellerEvent.STARTUP))
-                        .setup(tasks(
-                                SellerSendPaymentReceivedMessageToBuyer.class,
-                                SellerSendPaymentReceivedMessageToArbitrator.class)
-                        .using(new TradeTaskRunner(trade,
-                                () -> {
-                                    unlatchTrade();
-                                },
-                                (errorMessage) -> {
-                                    log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage);
-                                    unlatchTrade();
-                                })))
-                        .executeTasks();
-                    awaitTradeLatch();
-                }
+                if (!needsToResendPaymentReceivedMessages()) return;
+                latchTrade();
+                given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
+                    .with(SellerEvent.STARTUP))
+                    .setup(tasks(
+                            SellerSendPaymentReceivedMessageToBuyer.class,
+                            SellerSendPaymentReceivedMessageToArbitrator.class)
+                    .using(new TradeTaskRunner(trade,
+                            () -> {
+                                unlatchTrade();
+                            },
+                            (errorMessage) -> {
+                                log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage);
+                                unlatchTrade();
+                            })))
+                    .executeTasks();
+                awaitTradeLatch();
             }
         }, trade.getId());
     }
 
+    public boolean needsToResendPaymentReceivedMessages() {
+        return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
+    }
+
+    private boolean resendPaymentReceivedMessagesEnabled() {
+        return trade.getTakeOfferDate().getTime() > RESEND_PAYMENT_RECEIVED_MSGS_AFTER;
+    }
+
     @Override
     protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
         super.onTradeMessage(message, peer);
diff --git a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java
index eeef2d4daf..11c035a329 100644
--- a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java
+++ b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java
@@ -24,12 +24,17 @@ import haveno.common.crypto.PubKeyRing;
 import haveno.common.proto.ProtoUtil;
 import haveno.common.proto.persistable.PersistablePayload;
 import haveno.core.account.witness.AccountAgeWitness;
+import haveno.core.network.MessageState;
 import haveno.core.payment.payload.PaymentAccountPayload;
 import haveno.core.proto.CoreProtoResolver;
 import haveno.core.support.dispute.messages.DisputeClosedMessage;
+import haveno.core.trade.TradeManager;
 import haveno.core.trade.messages.PaymentReceivedMessage;
 import haveno.core.trade.messages.PaymentSentMessage;
+import haveno.network.p2p.AckMessage;
 import haveno.network.p2p.NodeAddress;
+import javafx.beans.property.ObjectProperty;
+import javafx.beans.property.SimpleObjectProperty;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +62,7 @@ public final class TradePeer implements PersistablePayload {
     @Nullable
     transient private byte[] preparedDepositTx;
     transient private MoneroTxWallet depositTx;
+    transient private TradeManager tradeManager;
 
     // Persistable mutable
     @Nullable
@@ -96,7 +102,6 @@ public final class TradePeer implements PersistablePayload {
     @Getter
     private DisputeClosedMessage disputeClosedMessage;
 
-
     // added in v 0.6
     @Nullable
     private byte[] accountAgeWitnessNonce;
@@ -142,13 +147,32 @@ public final class TradePeer implements PersistablePayload {
     private long payoutAmount;
     @Nullable
     private String updatedMultisigHex;
-    @Getter
+    @Deprecated
+    private boolean depositsConfirmedMessageAcked;
+    
+    // We want to indicate the user the state of the message delivery of the payment
+    // confirmation messages. We do an automatic re-send in case it was not ACKed yet.
+    // To enable that even after restart we persist the state.
     @Setter
-    boolean depositsConfirmedMessageAcked;
+    private ObjectProperty<MessageState> depositsConfirmedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
+    @Setter
+    private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
+    @Setter
+    private ObjectProperty<MessageState> paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
 
     public TradePeer() {
     }
 
+    public void applyTransient(TradeManager tradeManager) {
+        this.tradeManager = tradeManager;
+
+        // migrate deprecated fields to new model for v1.0.19
+        if (depositsConfirmedMessageAcked && depositsConfirmedMessageStateProperty.get() == MessageState.UNDEFINED) {
+            depositsConfirmedMessageStateProperty.set(MessageState.ACKNOWLEDGED);
+            tradeManager.requestPersistence();
+        }
+    }
+
     public BigInteger getDepositTxFee() {
         return BigInteger.valueOf(depositTxFee);
     }
@@ -181,6 +205,60 @@ public final class TradePeer implements PersistablePayload {
         this.payoutAmount = payoutAmount.longValueExact();
     }
 
+    void setDepositsConfirmedAckMessage(AckMessage ackMessage) {
+        MessageState messageState = ackMessage.isSuccess() ?
+                MessageState.ACKNOWLEDGED :
+                MessageState.FAILED;
+        setDepositsConfirmedMessageState(messageState);
+    }
+
+    void setPaymentSentAckMessage(AckMessage ackMessage) {
+        MessageState messageState = ackMessage.isSuccess() ?
+                MessageState.ACKNOWLEDGED :
+                MessageState.FAILED;
+        setPaymentSentMessageState(messageState);
+    }
+
+    void setPaymentReceivedAckMessage(AckMessage ackMessage) {
+        MessageState messageState = ackMessage.isSuccess() ?
+                MessageState.ACKNOWLEDGED :
+                MessageState.FAILED;
+        setPaymentReceivedMessageState(messageState);
+    }
+
+    public void setDepositsConfirmedMessageState(MessageState depositsConfirmedMessageStateProperty) {
+        this.depositsConfirmedMessageStateProperty.set(depositsConfirmedMessageStateProperty);
+        if (tradeManager != null) {
+            tradeManager.requestPersistence();
+        }
+    }
+
+    public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) {
+        this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty);
+        if (tradeManager != null) {
+            tradeManager.requestPersistence();
+        }
+    }
+
+    public void setPaymentReceivedMessageState(MessageState paymentReceivedMessageStateProperty) {
+        this.paymentReceivedMessageStateProperty.set(paymentReceivedMessageStateProperty);
+        if (tradeManager != null) {
+            tradeManager.requestPersistence();
+        }
+    }
+
+    public boolean isDepositsConfirmedMessageAcked() {
+        return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
+    }
+
+    public boolean isPaymentSentMessageAcked() {
+        return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
+    }
+
+    public boolean isPaymentReceivedMessageReceived() {
+        return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
+    }
+
     @Override
     public Message toProtoMessage() {
         final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder();
@@ -221,6 +299,9 @@ public final class TradePeer implements PersistablePayload {
         Optional.ofNullable(payoutTxFee).ifPresent(e -> builder.setPayoutTxFee(payoutTxFee));
         Optional.ofNullable(payoutAmount).ifPresent(e -> builder.setPayoutAmount(payoutAmount));
         builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked);
+        builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name());
+        builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name());
+        builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name());
 
         builder.setCurrentDate(currentDate);
         return builder.build();
@@ -270,6 +351,19 @@ public final class TradePeer implements PersistablePayload {
             tradePeer.setUnsignedPayoutTxHex(ProtoUtil.stringOrNullFromProto(proto.getUnsignedPayoutTxHex()));
             tradePeer.setPayoutTxFee(BigInteger.valueOf(proto.getPayoutTxFee()));
             tradePeer.setPayoutAmount(BigInteger.valueOf(proto.getPayoutAmount()));
+
+            String depositsConfirmedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDepositsConfirmedMessageState());
+            MessageState depositsConfirmedMessageState = ProtoUtil.enumFromProto(MessageState.class, depositsConfirmedMessageStateString);
+            tradePeer.setDepositsConfirmedMessageState(depositsConfirmedMessageState);
+
+            String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState());
+            MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString);
+            tradePeer.setPaymentSentMessageState(paymentSentMessageState);
+
+            String paymentReceivedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageState());
+            MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString);
+            tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState);
+
             return tradePeer;
         }
     }
diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
index 08c99570f6..512db73983 100644
--- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
+++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
@@ -252,6 +252,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
             MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
             if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
             handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
+
+            // reprocess applicable messages
+            trade.reprocessApplicableMessages();
         }
 
         // send deposits confirmed message if applicable
@@ -281,6 +284,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
         }, trade.getId());
     }
 
+    public boolean needsToResendPaymentReceivedMessages() {
+        return false; // seller protocol overrides
+    }
+
     public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) {
         if (trade.isShutDownStarted()) return;
         ThreadUtils.execute(() -> {
@@ -291,7 +298,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
                     return;
                 }
 
-                log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId());
+                log.warn("Reprocessing PaymentSentMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
                 handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError);
             }
         }, trade.getId());
@@ -307,7 +314,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
                     return;
                 }
 
-                log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
+                log.warn("Reprocessing PaymentReceivedMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
                 handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
             }
         }, trade.getId());
@@ -710,47 +717,76 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
 
     private void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
 
-        // handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time
-        if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
-            if (trade.getTradePeer(sender) == trade.getSeller()) {
-                processModel.setPaymentSentAckMessageSeller(ackMessage);
-                trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
-                processModel.getTradeManager().requestPersistence();
-            } else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
-                processModel.setPaymentSentAckMessageArbitrator(ackMessage);
-                processModel.getTradeManager().requestPersistence();
-            } else if (!ackMessage.isSuccess()) {
-                String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage();
-                log.warn(err);
-                return; // log error and ignore nack if not seller
-            }
+        // get trade peer
+        TradePeer peer = trade.getTradePeer(sender);
+        if (peer == null) {
+            if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator();
+            else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker();
+            else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker();
+        }
+        if (peer == null) {
+            if (ackMessage.isSuccess()) log.warn("Received AckMessage from unknown peer for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
+            else log.warn("Received AckMessage with error state from unknown peer for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
+            return;
         }
 
-        if (ackMessage.isSuccess()) {
-            log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
+        // update sender's node address
+        if (!peer.getNodeAddress().equals(sender)) {
+            log.info("Updating peer's node address from {} to {} using ACK message to {}", peer.getNodeAddress(), sender, ackMessage.getSourceMsgClassName());
+            peer.setNodeAddress(sender);
+        }
 
-            // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
-            if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
-                TradePeer peer = trade.getTradePeer(sender);
-                if (peer == null) {
-
-                    // get the applicable peer based on the sourceUid
-                    if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator();
-                    else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker();
-                    else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker();
-                }
-                if (peer == null) log.warn("Received AckMesage for DepositsConfirmedMessage for unknown peer: " + sender);
-                else peer.setDepositsConfirmedMessageAcked(true);
-            }
-        } else {
-            log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
-
-            // set trade state on deposit request nack
-            if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {
+        // set trade state on deposit request nack
+        if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {
+            if (!ackMessage.isSuccess()) {
                 trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
                 processModel.getTradeManager().requestPersistence();
             }
+        }
 
+        // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
+        if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
+            peer.setDepositsConfirmedAckMessage(ackMessage);
+            processModel.getTradeManager().requestPersistence();
+        }
+
+        // handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time
+        if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
+            if (trade.getTradePeer(sender) == trade.getSeller()) {
+                trade.getSeller().setPaymentSentAckMessage(ackMessage);
+                if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
+                else trade.setState(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
+                processModel.getTradeManager().requestPersistence();
+            } else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
+                trade.getArbitrator().setPaymentSentAckMessage(ackMessage);
+                processModel.getTradeManager().requestPersistence();
+            } else {
+                log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
+                return;
+            }
+        }
+
+        // handle ack for PaymentReceivedMessage, which automatically re-sends if not ACKed in a certain time
+        if (ackMessage.getSourceMsgClassName().equals(PaymentReceivedMessage.class.getSimpleName())) {
+            if (trade.getTradePeer(sender) == trade.getBuyer()) {
+                trade.getBuyer().setPaymentReceivedAckMessage(ackMessage);
+                if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG);
+                else trade.setState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
+                processModel.getTradeManager().requestPersistence();
+            } else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
+                trade.getArbitrator().setPaymentReceivedAckMessage(ackMessage);
+                processModel.getTradeManager().requestPersistence();
+            } else {
+                log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
+                return;
+            }
+        }
+
+        // generic handling
+        if (ackMessage.isSuccess()) {
+            log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
+        } else {
+            log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
             handleError(ackMessage.getErrorMessage());
         }
     }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java
index bc064399a3..86bb957577 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java
@@ -142,26 +142,26 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
 
     @Override
     protected void setStateSent() {
-        if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
+        getReceiver().setPaymentSentMessageState(MessageState.SENT);
         tryToSendAgainLater();
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateArrived() {
-        trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
+        getReceiver().setPaymentSentMessageState(MessageState.ARRIVED);
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateStoredInMailbox() {
-        trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
+        getReceiver().setPaymentSentMessageState(MessageState.STORED_IN_MAILBOX);
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateFault() {
-        trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
+        getReceiver().setPaymentSentMessageState(MessageState.FAILED);
         processModel.getTradeManager().requestPersistence();
     }
 
@@ -170,7 +170,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
             timer.stop();
         }
         if (listener != null) {
-            processModel.getPaymentSentMessageStatePropertySeller().removeListener(listener);
+            trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener);
         }
     }
 
@@ -185,7 +185,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
             return;
         }
 
-        log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
         if (timer != null) {
             timer.stop();
         }
@@ -194,8 +193,8 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
 
         if (resendCounter == 0) {
             listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
-            processModel.getPaymentSentMessageStatePropertySeller().addListener(listener);
-            onMessageStateChange(processModel.getPaymentSentMessageStatePropertySeller().get());
+            getReceiver().getPaymentSentMessageStateProperty().addListener(listener);
+            onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get());
         }
 
         // first re-send is after 2 minutes, then increase the delay exponentially
@@ -212,12 +211,12 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
     }
 
     private void onMessageStateChange(MessageState newValue) {
-        if (newValue == MessageState.ACKNOWLEDGED) {
-            trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
-            processModel.getTradeManager().requestPersistence();
+        if (isAckedByReceiver()) {
             cleanup();
         }
     }
 
-    protected abstract boolean isAckedByReceiver();
+    protected boolean isAckedByReceiver() {
+        return getReceiver().isPaymentSentMessageAcked();
+    }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java
index cd3098737a..9fea701200 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToArbitrator.java
@@ -38,26 +38,7 @@ public class BuyerSendPaymentSentMessageToArbitrator extends BuyerSendPaymentSen
 
     @Override
     protected void setStateSent() {
+        super.setStateSent();
         complete(); // don't wait for message to arbitrator
     }
-
-    @Override
-    protected void setStateFault() {
-        // state only updated on seller message
-    }
-
-    @Override
-    protected void setStateStoredInMailbox() {
-        // state only updated on seller message
-    }
-
-    @Override
-    protected void setStateArrived() {
-        // state only updated on seller message
-    }
-
-    @Override
-    protected boolean isAckedByReceiver() {
-        return trade.getProcessModel().isPaymentSentMessageAckedByArbitrator();
-    }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java
index 825220d5b4..57ca170455 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java
@@ -18,7 +18,6 @@
 package haveno.core.trade.protocol.tasks;
 
 import haveno.common.taskrunner.TaskRunner;
-import haveno.core.network.MessageState;
 import haveno.core.trade.Trade;
 import haveno.core.trade.messages.TradeMessage;
 import haveno.core.trade.protocol.TradePeer;
@@ -40,25 +39,25 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
     
     @Override
     protected void setStateSent() {
-        trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.SENT);
+        if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
         super.setStateSent();
     }
 
     @Override
     protected void setStateArrived() {
-        trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.ARRIVED);
+        trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
         super.setStateArrived();
     }
 
     @Override
     protected void setStateStoredInMailbox() {
-        trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.STORED_IN_MAILBOX);
+        trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
         super.setStateStoredInMailbox();
     }
 
     @Override
     protected void setStateFault() {
-        trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.FAILED);
+        trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
         super.setStateFault();
     }
 
@@ -69,9 +68,4 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
         appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
         complete();
     }
-
-    @Override
-    protected boolean isAckedByReceiver() {
-        return trade.getProcessModel().isPaymentSentMessageAckedBySeller();
-    }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java
index f08fe87946..202d4c8c79 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java
@@ -35,11 +35,15 @@
 package haveno.core.trade.protocol.tasks;
 
 import com.google.common.base.Charsets;
+
+import haveno.common.Timer;
+import haveno.common.UserThread;
 import haveno.common.crypto.PubKeyRing;
 import haveno.common.crypto.Sig;
 import haveno.common.taskrunner.TaskRunner;
 import haveno.core.account.sign.SignedWitness;
 import haveno.core.account.witness.AccountAgeWitnessService;
+import haveno.core.network.MessageState;
 import haveno.core.trade.HavenoUtils;
 import haveno.core.trade.Trade;
 import haveno.core.trade.messages.PaymentReceivedMessage;
@@ -47,15 +51,23 @@ import haveno.core.trade.messages.TradeMailboxMessage;
 import haveno.core.trade.protocol.TradePeer;
 import haveno.core.util.JsonUtil;
 import haveno.network.p2p.NodeAddress;
+import javafx.beans.value.ChangeListener;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.concurrent.TimeUnit;
+
 @Slf4j
 @EqualsAndHashCode(callSuper = true)
 public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask {
-    SignedWitness signedWitness = null;
+    private SignedWitness signedWitness = null;
+    private ChangeListener<MessageState> listener;
+    private Timer timer;
+    private static final int MAX_RESEND_ATTEMPTS = 20;
+    private int delayInMin = 10;
+    private int resendCounter = 0;
 
     public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
         super(taskHandler, trade);
@@ -77,6 +89,13 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
     protected void run() {
         try {
             runInterceptHook();
+
+            // skip if already received
+            if (isReceived()) {
+                if (!isCompleted()) complete();
+                return;
+            }
+
             super.run();
         } catch (Throwable t) {
             failed(t);
@@ -134,29 +153,85 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
 
     @Override
     protected void setStateSent() {
-        trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
         log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
+        getReceiver().setPaymentReceivedMessageState(MessageState.SENT);
+        tryToSendAgainLater();
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateFault() {
-        trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
         log.error("{} failed: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
+        getReceiver().setPaymentReceivedMessageState(MessageState.FAILED);
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateStoredInMailbox() {
-        trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
         log.info("{} stored in mailbox: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
+        getReceiver().setPaymentReceivedMessageState(MessageState.STORED_IN_MAILBOX);
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateArrived() {
-        trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
         log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
+        getReceiver().setPaymentReceivedMessageState(MessageState.ARRIVED);
         processModel.getTradeManager().requestPersistence();
     }
+
+    private void cleanup() {
+        if (timer != null) {
+            timer.stop();
+        }
+        if (listener != null) {
+            trade.getBuyer().getPaymentReceivedMessageStateProperty().removeListener(listener);
+        }
+    }
+
+    private void tryToSendAgainLater() {
+
+        // skip if already received
+        if (isReceived()) return;
+
+        if (resendCounter >= MAX_RESEND_ATTEMPTS) {
+            cleanup();
+            log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message.");
+            return;
+        }
+
+        if (timer != null) {
+            timer.stop();
+        }
+
+        timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
+
+        if (resendCounter == 0) {
+            listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
+            getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener);
+            onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get());
+        }
+
+        // first re-send is after 2 minutes, then increase the delay exponentially
+        if (resendCounter == 0) {
+            int shortDelay = 2;
+            log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
+            timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
+        } else {
+            log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
+            timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
+            delayInMin = (int) ((double) delayInMin * 1.5);
+        }
+        resendCounter++;
+    }
+
+    private void onMessageStateChange(MessageState newValue) {
+        if (isReceived()) {
+            cleanup();
+        }
+    }
+
+    protected boolean isReceived() {
+        return getReceiver().isPaymentReceivedMessageReceived();
+    }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java
index 7228b40307..212dcb22f4 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java
@@ -37,6 +37,30 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe
         return trade.getBuyer();
     }
 
+    @Override
+    protected void setStateSent() {
+        trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
+        super.setStateSent();
+    }
+
+    @Override
+    protected void setStateFault() {
+        trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
+        super.setStateFault();
+    }
+
+    @Override
+    protected void setStateStoredInMailbox() {
+        trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
+        super.setStateStoredInMailbox();
+    }
+
+    @Override
+    protected void setStateArrived() {
+        trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
+        super.setStateArrived();
+    }
+
     // continue execution on fault so payment received message is sent to arbitrator
     @Override
     protected void onFault(String errorMessage, TradeMessage message) {
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java
index fb17d60d4d..ba20d74351 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java
@@ -23,6 +23,7 @@ import haveno.common.Timer;
 import haveno.common.UserThread;
 import haveno.common.crypto.PubKeyRing;
 import haveno.common.taskrunner.TaskRunner;
+import haveno.core.network.MessageState;
 import haveno.core.trade.HavenoUtils;
 import haveno.core.trade.Trade;
 import haveno.core.trade.messages.DepositsConfirmedMessage;
@@ -52,8 +53,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
         try {
             runInterceptHook();
 
-            // skip if already acked by receiver
-            if (isAckedByReceiver()) {
+            // skip if already acked or payout published
+            if (isAckedByReceiver() || trade.isPayoutPublished()) {
                 if (!isCompleted()) complete();
                 return;
             }
@@ -64,11 +65,17 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
         }
     }
 
-    @Override
-    protected abstract NodeAddress getReceiverNodeAddress();
+    protected abstract TradePeer getReceiver();
 
     @Override
-    protected abstract PubKeyRing getReceiverPubKeyRing();
+    protected NodeAddress getReceiverNodeAddress() {
+        return getReceiver().getNodeAddress();
+    }
+
+    @Override
+    protected PubKeyRing getReceiverPubKeyRing() {
+        return getReceiver().getPubKeyRing();
+    }
 
     @Override
     protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
@@ -97,23 +104,24 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
 
     @Override
     protected void setStateSent() {
+        getReceiver().setDepositsConfirmedMessageState(MessageState.SENT);
         tryToSendAgainLater();
         processModel.getTradeManager().requestPersistence();
     }
 
     @Override
     protected void setStateArrived() {
-        // no additional handling
+        getReceiver().setDepositsConfirmedMessageState(MessageState.ARRIVED);
     }
 
     @Override
     protected void setStateStoredInMailbox() {
-        // no additional handling
+        getReceiver().setDepositsConfirmedMessageState(MessageState.STORED_IN_MAILBOX);
     }
 
     @Override
     protected void setStateFault() {
-        // no additional handling
+        getReceiver().setDepositsConfirmedMessageState(MessageState.FAILED);
     }
 
     private void cleanup() {
@@ -151,7 +159,6 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
     }
 
     private boolean isAckedByReceiver() {
-        TradePeer peer = trade.getTradePeer(getReceiverNodeAddress());
-        return peer.isDepositsConfirmedMessageAcked();
+        return getReceiver().isDepositsConfirmedMessageAcked();
     }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java
index baaa6ae987..ae8a171aa8 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToArbitrator.java
@@ -17,10 +17,9 @@
 
 package haveno.core.trade.protocol.tasks;
 
-import haveno.common.crypto.PubKeyRing;
 import haveno.common.taskrunner.TaskRunner;
 import haveno.core.trade.Trade;
-import haveno.network.p2p.NodeAddress;
+import haveno.core.trade.protocol.TradePeer;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToArbitrator extends SendDepositsConfir
     }
 
     @Override
-    public NodeAddress getReceiverNodeAddress() {
-        return trade.getArbitrator().getNodeAddress();
-    }
-
-    @Override
-    public PubKeyRing getReceiverPubKeyRing() {
-        return trade.getArbitrator().getPubKeyRing();
+    protected TradePeer getReceiver() {
+        return trade.getArbitrator();
     }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java
index 5795ce8947..bf1212c9a8 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToBuyer.java
@@ -17,10 +17,9 @@
 
 package haveno.core.trade.protocol.tasks;
 
-import haveno.common.crypto.PubKeyRing;
 import haveno.common.taskrunner.TaskRunner;
 import haveno.core.trade.Trade;
-import haveno.network.p2p.NodeAddress;
+import haveno.core.trade.protocol.TradePeer;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToBuyer extends SendDepositsConfirmedMe
     }
 
     @Override
-    public NodeAddress getReceiverNodeAddress() {
-        return trade.getBuyer().getNodeAddress();
-    }
-
-    @Override
-    public PubKeyRing getReceiverPubKeyRing() {
-        return trade.getBuyer().getPubKeyRing();
+    protected TradePeer getReceiver() {
+        return trade.getBuyer();
     }
 }
diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java
index efdf9a99cd..4ea097fd2b 100644
--- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java
+++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessageToSeller.java
@@ -17,10 +17,9 @@
 
 package haveno.core.trade.protocol.tasks;
 
-import haveno.common.crypto.PubKeyRing;
 import haveno.common.taskrunner.TaskRunner;
 import haveno.core.trade.Trade;
-import haveno.network.p2p.NodeAddress;
+import haveno.core.trade.protocol.TradePeer;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToSeller extends SendDepositsConfirmedM
     }
 
     @Override
-    public NodeAddress getReceiverNodeAddress() {
-        return trade.getSeller().getNodeAddress();
-    }
-
-    @Override
-    public PubKeyRing getReceiverPubKeyRing() {
-        return trade.getSeller().getPubKeyRing();
+    protected TradePeer getReceiver() {
+        return trade.getSeller();
     }
 }
diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java
index b9936236c6..b19d6dcc26 100644
--- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java
+++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesViewModel.java
@@ -200,7 +200,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
                 payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> {
                     onPayoutStateChanged(state);
                 });
-                messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStatePropertySeller(), this::onPaymentSentMessageStateChanged);
+                messageStateSubscription = EasyBind.subscribe(trade.getSeller().getPaymentSentMessageStateProperty(), this::onPaymentSentMessageStateChanged);
             }
         }
     }
@@ -425,6 +425,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
             case SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG:
             case SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG:
             case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
+            case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
                 sellerState.set(trade.isPayoutPublished() ? SellerState.STEP4 : SellerState.STEP3);
                 break;
 
diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java
index c5cfcbcdb5..c6fe0cab23 100644
--- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java
+++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/steps/seller/SellerStep3View.java
@@ -135,6 +135,7 @@ public class SellerStep3View extends TradeStepView {
                         statusLabel.setText(Res.get("shared.messageStoredInMailbox"));
                         break;
                     case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
+                    case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
                         busyAnimation.stop();
                         statusLabel.setText(Res.get("shared.messageArrived"));
                         break;
diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto
index 9bca09b668..a814f71e4a 100644
--- a/proto/src/main/proto/pb.proto
+++ b/proto/src/main/proto/pb.proto
@@ -1465,6 +1465,7 @@ message Trade {
         SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24;
         SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25;
         SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26;
+        BUYER_RECEIVED_PAYMENT_RECEIVED_MSG = 27;
     }
 
     enum Phase {
@@ -1568,8 +1569,8 @@ message ProcessModel {
     bytes payout_tx_signature = 4;
     bool use_savings_wallet = 5;
     int64 funds_needed_for_trade = 6;
-    string payment_sent_message_state_seller = 7;
-    string payment_sent_message_state_arbitrator = 8;
+    string payment_sent_message_state_seller = 7 [deprecated = true];
+    string payment_sent_message_state_arbitrator = 8 [deprecated = true];
     bytes maker_signature = 9;
     TradePeer maker = 10;
     TradePeer taker = 11;
@@ -1613,7 +1614,7 @@ message TradePeer {
     string made_multisig_hex = 31;
     string exchanged_multisig_hex = 32;
     string updated_multisig_hex = 33;
-    bool deposits_confirmed_message_acked = 34;
+    bool deposits_confirmed_message_acked = 34 [deprecated = true];
     string deposit_tx_hash = 35;
     string deposit_tx_hex = 36;
     string deposit_tx_key = 37;
@@ -1622,6 +1623,9 @@ message TradePeer {
     string unsigned_payout_tx_hex = 40;
     int64 payout_tx_fee = 41;
     int64 payout_amount = 42;
+    string deposits_confirmed_message_state = 43;
+    string payment_sent_message_state = 44;
+    string payment_received_message_state = 45;
 }
 
 ///////////////////////////////////////////////////////////////////////////////////////////