diff --git a/core/src/main/java/bisq/core/trade/Trade.java b/core/src/main/java/bisq/core/trade/Trade.java index 547ef028be..a57a59b829 100644 --- a/core/src/main/java/bisq/core/trade/Trade.java +++ b/core/src/main/java/bisq/core/trade/Trade.java @@ -39,6 +39,7 @@ import bisq.core.util.ParsingUtils; import bisq.core.util.VolumeUtil; import bisq.network.p2p.AckMessage; import bisq.network.p2p.NodeAddress; +import bisq.network.p2p.P2PService; import bisq.common.UserThread; import bisq.common.crypto.PubKeyRing; import bisq.common.proto.ProtoUtil; @@ -647,6 +648,13 @@ public abstract class Trade implements Tradable, Model { // API /////////////////////////////////////////////////////////////////////////////////////////// + public void setMyNodeAddress() { + if (this instanceof MakerTrade) makerNodeAddress = P2PService.getMyNodeAddress(); + else if (this instanceof TakerTrade) takerNodeAddress = P2PService.getMyNodeAddress(); + else if (this instanceof ArbitratorTrade) arbitratorNodeAddress = P2PService.getMyNodeAddress(); + else throw new RuntimeException("Must be maker, taker, or arbitrator to set own address"); + } + public void setTradingPeerNodeAddress(NodeAddress peerAddress) { if (this instanceof MakerTrade) takerNodeAddress = peerAddress; else if (this instanceof TakerTrade) makerNodeAddress = peerAddress; diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java index 4f9d4ea9b0..d77758be80 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java @@ -52,7 +52,9 @@ public abstract class BuyerProtocol extends DisputeProtocol { @Override protected void onInitialized() { super.onInitialized(); - + + // TODO: run with trade lock and latch, otherwise getting invalid transition warnings on startup after offline trades + given(phase(Trade.Phase.DEPOSITS_PUBLISHED) .with(BuyerEvent.STARTUP)) .setup(tasks(SetupDepositTxsListener.class)) @@ -90,25 +92,30 @@ public abstract class BuyerProtocol extends DisputeProtocol { latchTrade(); this.errorMessageHandler = errorMessageHandler; BuyerEvent event = BuyerEvent.PAYMENT_SENT; - expect(phase(Trade.Phase.DEPOSITS_UNLOCKED) - .with(event) - .preCondition(trade.confirmPermitted())) - .setup(tasks(ApplyFilter.class, - //UpdateMultisigWithTradingPeer.class, // TODO (woodser): can use this to test protocol with updated multisig from peer. peer should attempt to send updated multisig hex earlier as part of protocol. cannot use with countdown latch because response comes back in a separate thread and blocks on trade - BuyerPreparesPaymentSentMessage.class, - //BuyerSetupPayoutTxListener.class, - BuyerSendsPaymentSentMessage.class) // don't latch trade because this blocks and runs in background - .using(new TradeTaskRunner(trade, - () -> { - this.errorMessageHandler = null; - handleTaskRunnerSuccess(event); - resultHandler.handleResult(); - }, - (errorMessage) -> { - handleTaskRunnerFault(event, errorMessage); - }))) - .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT)) - .executeTasks(true); + try { + expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.PAYMENT_SENT) + .with(event) + .preCondition(trade.confirmPermitted())) + .setup(tasks(ApplyFilter.class, + //UpdateMultisigWithTradingPeer.class, // TODO (woodser): can use this to test protocol with updated multisig from peer. peer should attempt to send updated multisig hex earlier as part of protocol. cannot use with countdown latch because response comes back in a separate thread and blocks on trade + BuyerPreparesPaymentSentMessage.class, + //BuyerSetupPayoutTxListener.class, + BuyerSendsPaymentSentMessage.class) // don't latch trade because this blocks and runs in background + .using(new TradeTaskRunner(trade, + () -> { + this.errorMessageHandler = null; + resultHandler.handleResult(); + handleTaskRunnerSuccess(event); + }, + (errorMessage) -> { + handleTaskRunnerFault(event, errorMessage); + }))) + .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT)) + .executeTasks(true); + } catch (Exception e) { + errorMessageHandler.handleErrorMessage("Error confirming payment sent: " + e.getMessage()); + unlatchTrade(); + } awaitTradeLatch(); } }).start(); diff --git a/core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java b/core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java index 5150196b3a..b87951d6ec 100644 --- a/core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java @@ -288,11 +288,12 @@ public class FluentProtocol { event.name() + " event" : ""; if (isPhaseValid) { - String info = MessageFormat.format("We received a {0} at phase {1} and state {2}, tradeId={3}", + String info = MessageFormat.format("We received a {0} at phase {1} and state {2}, tradeId={3}, peer={4}", trigger, trade.getPhase(), trade.getState(), - trade.getId()); + trade.getId(), + this.peer); log.info(info); return Result.VALID.info(info); } else { diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java index 6ac5401f88..8596dea3ea 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java @@ -48,6 +48,8 @@ public abstract class SellerProtocol extends DisputeProtocol { protected void onInitialized() { super.onInitialized(); + // TODO: run with trade lock and latch, otherwise getting invalid transition warnings on startup after offline trades + given(phase(Trade.Phase.DEPOSITS_PUBLISHED) .with(BuyerEvent.STARTUP)) .setup(tasks(SetupDepositTxsListener.class)) @@ -124,22 +126,27 @@ public abstract class SellerProtocol extends DisputeProtocol { latchTrade(); this.errorMessageHandler = errorMessageHandler; SellerEvent event = SellerEvent.PAYMENT_RECEIVED; - expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED) - .with(event) - .preCondition(trade.confirmPermitted())) - .setup(tasks( - ApplyFilter.class, - SellerPreparesPaymentReceivedMessage.class, - SellerSendsPaymentReceivedMessage.class) - .using(new TradeTaskRunner(trade, () -> { - this.errorMessageHandler = null; - handleTaskRunnerSuccess(event); - resultHandler.handleResult(); - }, (errorMessage) -> { - handleTaskRunnerFault(event, errorMessage); - }))) - .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_PAYMENT_RECEIPT)) - .executeTasks(true); + try { + expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED) + .with(event) + .preCondition(trade.confirmPermitted())) + .setup(tasks( + ApplyFilter.class, + SellerPreparesPaymentReceivedMessage.class, + SellerSendsPaymentReceivedMessage.class) + .using(new TradeTaskRunner(trade, () -> { + this.errorMessageHandler = null; + handleTaskRunnerSuccess(event); + resultHandler.handleResult(); + }, (errorMessage) -> { + handleTaskRunnerFault(event, errorMessage); + }))) + .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_PAYMENT_RECEIPT)) + .executeTasks(true); + } catch (Exception e) { + errorMessageHandler.handleErrorMessage("Error confirming payment received: " + e.getMessage()); + unlatchTrade(); + } awaitTradeLatch(); } }).start(); diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 5e5280d25d..56ae9a844f 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -250,7 +250,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { - System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId()); + System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest() " + trade.getId()); synchronized (trade) { Validator.checkTradeId(processModel.getOfferId(), message); if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { @@ -566,11 +566,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D /////////////////////////////////////////////////////////////////////////////////////////// private PubKeyRing getPeersPubKeyRing(NodeAddress peer) { + trade.setMyNodeAddress(); // TODO: this is a hack to update my node address before verifying the message if (peer.equals(trade.getArbitratorNodeAddress())) return trade.getArbitratorPubKeyRing(); else if (peer.equals(trade.getMakerNodeAddress())) return trade.getMakerPubKeyRing(); else if (peer.equals(trade.getTakerNodeAddress())) return trade.getTakerPubKeyRing(); else { - log.error("Cannot get peer's pub key ring because peer is not maker, taker, or arbitrator"); + log.warn("Cannot get peer's pub key ring because peer is not maker, taker, or arbitrator. Their address might have changed: " + peer); return null; } } @@ -582,16 +583,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } private boolean isPubKeyValid(DecryptedMessageWithPubKey message, NodeAddress sender) { - // We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer - // Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already. - PubKeyRing peersPubKeyRing = getPeersPubKeyRing(sender); - boolean isValid = true; // TODO (woodser): this returns valid=true even if peer's pub key ring is null? - if (peersPubKeyRing != null && - !message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) { - isValid = false; - log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer."); - } - return isValid; + + // not invalid if pub key rings are unknown + if (trade.getTradingPeer().getPubKeyRing() == null && trade.getArbitratorPubKeyRing() == null) return true; + + // valid if peer's pub key ring + if (trade.getTradingPeer().getPubKeyRing() != null && message.getSignaturePubKey().equals(trade.getTradingPeer().getPubKeyRing().getSignaturePubKey())) return true; + + // valid if arbitrator's pub key ring + if (trade.getArbitratorPubKeyRing() != null && message.getSignaturePubKey().equals(trade.getArbitratorPubKeyRing().getSignaturePubKey())) return true; + + // invalid + log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer and arbitrator."); + return false; } /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/ArbitratorProcessesDepositRequest.java b/core/src/main/java/bisq/core/trade/protocol/tasks/ArbitratorProcessesDepositRequest.java index f06b6d0e3d..eaa92b8505 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/ArbitratorProcessesDepositRequest.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/ArbitratorProcessesDepositRequest.java @@ -123,7 +123,8 @@ public class ArbitratorProcessesDepositRequest extends TradeTask { sendDepositResponse(trade.getMakerNodeAddress(), trade.getMakerPubKeyRing(), response); sendDepositResponse(trade.getTakerNodeAddress(), trade.getTakerPubKeyRing(), response); } else { - log.info("Arbitrator waiting for deposit request from maker and taker for trade " + trade.getId()); + if (processModel.getMaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from maker for trade " + trade.getId()); + if (processModel.getTaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from taker for trade " + trade.getId()); } // TODO (woodser): request persistence? diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/BuyerSendsPaymentSentMessage.java b/core/src/main/java/bisq/core/trade/protocol/tasks/BuyerSendsPaymentSentMessage.java index 10d7fca64e..0022e489c9 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/BuyerSendsPaymentSentMessage.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/BuyerSendsPaymentSentMessage.java @@ -23,15 +23,11 @@ import bisq.core.network.MessageState; import bisq.core.trade.Trade; import bisq.core.trade.messages.PaymentSentMessage; import bisq.core.trade.messages.TradeMailboxMessage; -import bisq.core.trade.messages.TradeMessage; import bisq.common.Timer; -import bisq.common.UserThread; import bisq.common.taskrunner.TaskRunner; import javafx.beans.value.ChangeListener; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; /** @@ -45,9 +41,6 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask { - private static final int MAX_RESEND_ATTEMPTS = 10; - private int delayInMin = 15; - private int resendCounter = 0; private PaymentSentMessage message; private ChangeListener listener; private Timer timer; @@ -89,46 +82,24 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask { if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) { trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); } - processModel.getTradeManager().requestPersistence(); } @Override protected void setStateArrived() { trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG); - // the message has arrived but we're ultimately waiting for an AckMessage response - if (!trade.isPayoutPublished()) { - tryToSendAgainLater(); - } - } - - // We override the default behaviour for onStoredInMailbox and do not call complete - @Override - protected void onStoredInMailbox() { - setStateStoredInMailbox(); } @Override protected void setStateStoredInMailbox() { trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG); - if (!trade.isPayoutPublished()) { - tryToSendAgainLater(); - } processModel.getTradeManager().requestPersistence(); - } - - // We override the default behaviour for onFault and do not call appendToErrorMessage and failed - @Override - protected void onFault(String errorMessage, TradeMessage message) { - setStateFault(); + // TODO: schedule repeat sending like bisq? } @Override protected void setStateFault() { trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG); - if (!trade.isPayoutPublished()) { - tryToSendAgainLater(); - } processModel.getTradeManager().requestPersistence(); } @@ -136,7 +107,6 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask { protected void run() { try { runInterceptHook(); - super.run(); } catch (Throwable t) { failed(t); @@ -145,13 +115,6 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask { } } - // complete() is called from base class SendMailboxMessageTask=>onArrived() - // We override the default behaviour for complete and keep this task open until receipt of the AckMessage - @Override - protected void complete() { - onMessageStateChange(processModel.getPaymentStartedMessageStateProperty().get()); // check for AckMessage - } - private void cleanup() { if (timer != null) { timer.stop(); @@ -160,43 +123,4 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask { processModel.getPaymentStartedMessageStateProperty().removeListener(listener); } } - - private void tryToSendAgainLater() { - if (resendCounter >= MAX_RESEND_ATTEMPTS) { - cleanup(); - log.warn("We never received an ACK message when sending the PaymentSentMessage to the peer. " + - "We stop now and complete the protocol task."); - complete(); - return; - } - - log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); - if (timer != null) { - timer.stop(); - } - timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); - - if (resendCounter == 0) { - // We want to register listener only once - listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); - processModel.getPaymentStartedMessageStateProperty().addListener(listener); - onMessageStateChange(processModel.getPaymentStartedMessageStateProperty().get()); - } - - delayInMin = delayInMin * 2; - resendCounter++; - } - - private void onMessageStateChange(MessageState newValue) { - // Once we receive an ACK from our msg we know the peer has received the msg and we stop. - if (newValue == MessageState.ACKNOWLEDGED) { - // We treat a ACK like BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG - trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG); - - processModel.getTradeManager().requestPersistence(); - - cleanup(); - super.complete(); // received AckMessage, complete this task - } - } } diff --git a/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessInitMultisigRequest.java b/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessInitMultisigRequest.java index ad0635b502..c037e63c1b 100644 --- a/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessInitMultisigRequest.java +++ b/core/src/main/java/bisq/core/trade/protocol/tasks/ProcessInitMultisigRequest.java @@ -119,12 +119,12 @@ public class ProcessInitMultisigRequest extends TradeTask { // import exchanged multisig keys if applicable if (processModel.getMultisigAddress() == null && peers[0].getExchangedMultisigHex() != null && peers[1].getExchangedMultisigHex() != null) { - log.info("Importing exchanged multisig hex for trade {}", trade.getId()); - MoneroMultisigInitResult result = multisigWallet.exchangeMultisigKeys(Arrays.asList(peers[0].getExchangedMultisigHex(), peers[1].getExchangedMultisigHex()), xmrWalletService.getWalletPassword()); - processModel.setMultisigAddress(result.getAddress()); - trade.setStateIfValidTransitionTo(Trade.State.MULTISIG_COMPLETED); - processModel.getProvider().getXmrWalletService().closeMultisigWallet(trade.getId()); // save and close multisig wallet once it's created - } + log.info("Importing exchanged multisig hex for trade {}", trade.getId()); + MoneroMultisigInitResult result = multisigWallet.exchangeMultisigKeys(Arrays.asList(peers[0].getExchangedMultisigHex(), peers[1].getExchangedMultisigHex()), xmrWalletService.getWalletPassword()); + processModel.setMultisigAddress(result.getAddress()); + processModel.getProvider().getXmrWalletService().closeMultisigWallet(trade.getId()); // save and close multisig wallet once it's created + trade.setStateIfValidTransitionTo(Trade.State.MULTISIG_COMPLETED); + } // update multisig participants if new state to communicate if (updateParticipants) { diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 69947076d5..5df7069510 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -381,7 +381,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned()); connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope()); connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> - decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)), + decryptedDirectMessageListeners.forEach(e -> { + try { + e.onDirectMessage(decryptedMsg, nodeAddress); + } catch (Exception e2) { + e2.printStackTrace(); + } + }), () -> { log.error("peersNodeAddress is expected to be available at onMessage for " + "processing PrefixedSealedAndSignedMessage.");