diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index ee2bd0ccbb..44aaab6184 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import common.utils.GenUtils; import haveno.common.ClockWatcher; import haveno.common.crypto.KeyRing; +import haveno.common.crypto.PubKeyRing; import haveno.common.handlers.ErrorMessageHandler; import haveno.common.handlers.FaultHandler; import haveno.common.handlers.ResultHandler; @@ -69,11 +70,14 @@ import haveno.core.user.User; import haveno.core.util.Validator; import haveno.core.xmr.model.XmrAddressEntry; import haveno.core.xmr.wallet.XmrWalletService; +import haveno.network.p2p.AckMessage; +import haveno.network.p2p.AckMessageSourceType; import haveno.network.p2p.BootstrapListener; import haveno.network.p2p.DecryptedDirectMessageListener; import haveno.network.p2p.DecryptedMessageWithPubKey; import haveno.network.p2p.NodeAddress; import haveno.network.p2p.P2PService; +import haveno.network.p2p.SendMailboxMessageListener; import haveno.network.p2p.network.TorNetworkNode; import javafx.beans.property.BooleanProperty; import javafx.beans.property.LongProperty; @@ -523,47 +527,52 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi boolean isArbitrator = request.getArbitratorNodeAddress().equals(p2PService.getNetworkNode().getNodeAddress()); if (isArbitrator) { - // verify this node is registered arbitrator - Arbitrator thisArbitrator = user.getRegisteredArbitrator(); - NodeAddress thisAddress = p2PService.getNetworkNode().getNodeAddress(); - if (thisArbitrator == null || !thisArbitrator.getNodeAddress().equals(thisAddress)) { - log.warn("Ignoring InitTradeRequest from {} with tradeId {} because we are not an arbitrator", sender, request.getTradeId()); - return; - } + // verify this node is registered arbitrator + Arbitrator thisArbitrator = user.getRegisteredArbitrator(); + NodeAddress thisAddress = p2PService.getNetworkNode().getNodeAddress(); + if (thisArbitrator == null || !thisArbitrator.getNodeAddress().equals(thisAddress)) { + log.warn("Ignoring InitTradeRequest from {} with tradeId {} because we are not an arbitrator", sender, request.getTradeId()); + return; + } - // get offer associated with trade - Offer offer = null; - for (Offer anOffer : offerBookService.getOffers()) { - if (anOffer.getId().equals(request.getTradeId())) { - offer = anOffer; - } - } - if (offer == null) { - log.warn("Ignoring InitTradeRequest from {} with tradeId {} because offer is not on the books", sender, request.getTradeId()); - return; - } + // get offer associated with trade + Offer offer = null; + for (Offer anOffer : offerBookService.getOffers()) { + if (anOffer.getId().equals(request.getTradeId())) { + offer = anOffer; + } + } + if (offer == null) { + log.warn("Ignoring InitTradeRequest from {} with tradeId {} because offer is not on the books", sender, request.getTradeId()); + return; + } - // verify arbitrator is payload signer unless they are offline - // TODO (woodser): handle if payload signer differs from current arbitrator (verify signer is offline) + // verify arbitrator is payload signer unless they are offline + // TODO (woodser): handle if payload signer differs from current arbitrator (verify signer is offline) - // verify maker is offer owner - // TODO (woodser): maker address might change if they disconnect and reconnect, should allow maker address to differ if pubKeyRing is same ? - if (!offer.getOwnerNodeAddress().equals(request.getMakerNodeAddress())) { - log.warn("Ignoring InitTradeRequest from {} with tradeId {} because maker is not offer owner", sender, request.getTradeId()); - return; - } + // verify maker is offer owner + // TODO (woodser): maker address might change if they disconnect and reconnect, should allow maker address to differ if pubKeyRing is same? + if (!offer.getOwnerNodeAddress().equals(request.getMakerNodeAddress())) { + log.warn("Ignoring InitTradeRequest from {} with tradeId {} because maker is not offer owner", sender, request.getTradeId()); + return; + } - Trade trade; - Optional tradeOptional = getOpenTrade(offer.getId()); - if (tradeOptional.isPresent()) { - trade = tradeOptional.get(); + // handle trade + Trade trade; + Optional tradeOptional = getOpenTrade(offer.getId()); + if (tradeOptional.isPresent()) { + trade = tradeOptional.get(); - // verify request is from maker - if (!sender.equals(request.getMakerNodeAddress())) { - log.warn("Trade is already taken"); // TODO (woodser): need to respond with bad ack - return; - } - } else { + // verify request is from maker + if (!sender.equals(request.getMakerNodeAddress())) { + + // send nack if trade already taken + String errMsg = "Trade is already taken, tradeId=" + request.getTradeId(); + log.warn(errMsg); + sendAckMessage(sender, request.getPubKeyRing(), request, false, errMsg); + return; + } + } else { // verify request is from taker if (!sender.equals(request.getTakerNodeAddress())) { @@ -600,6 +609,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } } + // process with protocol ((ArbitratorProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> { log.warn("Arbitrator error during trade initialization for trade {}: {}", trade.getId(), errorMessage); maybeRemoveTradeOnError(trade); @@ -643,6 +653,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // get expected taker fee BigInteger takerFee = HavenoUtils.getTakerFee(BigInteger.valueOf(request.getTradeAmount())); + // initialize trade Trade trade; if (offer.isBuyOffer()) trade = new BuyerAsMakerTrade(offer, @@ -687,6 +698,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } }); + // process with protocol ((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> { log.warn("Maker error during trade initialization: " + errorMessage); maybeRemoveTradeOnError(trade); @@ -872,7 +884,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi initTradeAndProtocol(trade, tradeProtocol); - // take offer and persist trade on success + // process with protocol ((TakerProtocol) tradeProtocol).onTakeOffer(result -> { tradeResultHandler.handleResult(trade); requestPersistence(); @@ -1115,6 +1127,48 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // Getters, Utils /////////////////////////////////////////////////////////////////////////////////////////// + public void sendAckMessage(NodeAddress peer, PubKeyRing peersPubKeyRing, TradeMessage message, boolean result, @Nullable String errorMessage) { + + // create ack message + String tradeId = message.getTradeId(); + String sourceUid = message.getUid(); + AckMessage ackMessage = new AckMessage(P2PService.getMyNodeAddress(), + AckMessageSourceType.TRADE_MESSAGE, + message.getClass().getSimpleName(), + sourceUid, + tradeId, + result, + errorMessage); + + // send ack message + log.info("Send AckMessage for {} to peer {}. tradeId={}, sourceUid={}", + ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); + p2PService.getMailboxMessageService().sendEncryptedMailboxMessage( + peer, + peersPubKeyRing, + ackMessage, + new SendMailboxMessageListener() { + @Override + public void onArrived() { + log.info("AckMessage for {} arrived at peer {}. tradeId={}, sourceUid={}", + ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); + } + + @Override + public void onStoredInMailbox() { + log.info("AckMessage for {} stored in mailbox for peer {}. tradeId={}, sourceUid={}", + ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); + } + + @Override + public void onFault(String errorMessage) { + log.error("AckMessage for {} failed. Peer {}. tradeId={}, sourceUid={}, errorMessage={}", + ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid, errorMessage); + } + } + ); + } + public ObservableList getObservableList() { synchronized (tradableList) { return tradableList.getObservableList(); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index e240c699ee..63eda10281 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -62,7 +62,6 @@ import haveno.network.p2p.AckMessageSourceType; import haveno.network.p2p.DecryptedDirectMessageListener; import haveno.network.p2p.DecryptedMessageWithPubKey; import haveno.network.p2p.NodeAddress; -import haveno.network.p2p.SendMailboxMessageListener; import haveno.network.p2p.mailbox.MailboxMessage; import haveno.network.p2p.mailbox.MailboxMessageService; import haveno.network.p2p.messaging.DecryptedMailboxListener; @@ -666,7 +665,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D processModel.getTradeManager().requestPersistence(); } - handleError(err); + handleError(ackMessage.getErrorMessage()); } } @@ -679,44 +678,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D return; } - String tradeId = message.getTradeId(); - String sourceUid = message.getUid(); - AckMessage ackMessage = new AckMessage(processModel.getMyNodeAddress(), - AckMessageSourceType.TRADE_MESSAGE, - message.getClass().getSimpleName(), - sourceUid, - tradeId, - result, - errorMessage); - - log.info("Send AckMessage for {} to peer {}. tradeId={}, sourceUid={}", - ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); - processModel.getP2PService().getMailboxMessageService().sendEncryptedMailboxMessage( - peer, - peersPubKeyRing, - ackMessage, - new SendMailboxMessageListener() { - @Override - public void onArrived() { - log.info("AckMessage for {} arrived at peer {}. tradeId={}, sourceUid={}", - ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); - } - - @Override - public void onStoredInMailbox() { - log.info("AckMessage for {} stored in mailbox for peer {}. tradeId={}, sourceUid={}", - ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid); - } - - @Override - public void onFault(String errorMessage) { - log.error("AckMessage for {} failed. Peer {}. tradeId={}, sourceUid={}, errorMessage={}", - ackMessage.getSourceMsgClassName(), peer, tradeId, sourceUid, errorMessage); - } - } - ); + // send ack message + processModel.getTradeManager().sendAckMessage(peer, peersPubKeyRing, message, result, errorMessage); } + /////////////////////////////////////////////////////////////////////////////////////////// // Timeout ///////////////////////////////////////////////////////////////////////////////////////////