diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java index d77758be..fb5cfe36 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java @@ -127,25 +127,27 @@ public abstract class BuyerProtocol extends DisputeProtocol { protected void handle(PaymentReceivedMessage message, NodeAddress peer) { log.info("BuyerProtocol.handle(PaymentReceivedMessage)"); - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED) - .with(message) - .from(peer)) - .setup(tasks( - BuyerProcessesPaymentReceivedMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - handleTaskRunnerFault(peer, message, errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); + expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED) + .with(message) + .from(peer)) + .setup(tasks( + BuyerProcessesPaymentReceivedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + handleTaskRunnerFault(peer, message, errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java index 8596dea3..9d175f26 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java @@ -77,42 +77,44 @@ public abstract class SellerProtocol extends DisputeProtocol { protected void handle(PaymentSentMessage message, NodeAddress peer) { log.info("SellerProtocol.handle(PaymentSentMessage)"); - // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case - // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received - // a mailbox message with PaymentSentMessage. - // TODO A better fix would be to add a listener for the wallet sync state and process - // the mailbox msg once wallet is ready and trade state set. - synchronized (trade) { - if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Ignoring PaymentSentMessage which was already processed"); - return; + new Thread(() -> { + // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case + // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received + // a mailbox message with PaymentSentMessage. + // TODO A better fix would be to add a listener for the wallet sync state and process + // the mailbox msg once wallet is ready and trade state set. + synchronized (trade) { + if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Ignoring PaymentSentMessage which was already processed"); + return; + } + latchTrade(); + expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.DEPOSITS_PUBLISHED) + .with(message) + .from(peer) + .preCondition(trade.getPayoutTx() == null, + () -> { + log.warn("We received a PaymentSentMessage but we have already created the payout tx " + + "so we ignore the message. This can happen if the ACK message to the peer did not " + + "arrive and the peer repeats sending us the message. We send another ACK msg."); + sendAckMessage(peer, message, true, null); + removeMailboxMessageAfterProcessing(message); + })) + .setup(tasks( + ApplyFilter.class, + SellerProcessesPaymentSentMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + (errorMessage) -> { + stopTimeout(); + handleTaskRunnerFault(peer, message, errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); } - latchTrade(); - expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.DEPOSITS_PUBLISHED) - .with(message) - .from(peer) - .preCondition(trade.getPayoutTx() == null, - () -> { - log.warn("We received a PaymentSentMessage but we have already created the payout tx " + - "so we ignore the message. This can happen if the ACK message to the peer did not " + - "arrive and the peer repeats sending us the message. We send another ACK msg."); - sendAckMessage(peer, message, true, null); - removeMailboxMessageAfterProcessing(message); - })) - .setup(tasks( - ApplyFilter.class, - SellerProcessesPaymentSentMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - (errorMessage) -> { - stopTimeout(); - handleTaskRunnerFault(peer, message, errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } + }).start(); } ///////////////////////////////////////////////////////////////////////////////////////////