arbitrator sends deposit responses on error or timeout

This commit is contained in:
woodser 2024-05-07 12:37:26 -04:00
parent a6d827c369
commit 7887c450c7
7 changed files with 209 additions and 298 deletions

View file

@ -39,6 +39,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import haveno.common.ThreadUtils; import haveno.common.ThreadUtils;
import haveno.common.UserThread; import haveno.common.UserThread;
import haveno.common.app.Capability;
import haveno.common.crypto.Encryption; import haveno.common.crypto.Encryption;
import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.PubKeyRing;
import haveno.common.proto.ProtoUtil; import haveno.common.proto.ProtoUtil;
@ -66,12 +67,14 @@ import haveno.core.trade.protocol.ProcessModelServiceProvider;
import haveno.core.trade.protocol.TradeListener; import haveno.core.trade.protocol.TradeListener;
import haveno.core.trade.protocol.TradePeer; import haveno.core.trade.protocol.TradePeer;
import haveno.core.trade.protocol.TradeProtocol; import haveno.core.trade.protocol.TradeProtocol;
import haveno.core.trade.statistics.TradeStatistics3;
import haveno.core.util.VolumeUtil; import haveno.core.util.VolumeUtil;
import haveno.core.xmr.model.XmrAddressEntry; import haveno.core.xmr.model.XmrAddressEntry;
import haveno.core.xmr.wallet.XmrWalletService; import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage; import haveno.network.p2p.AckMessage;
import haveno.network.p2p.NodeAddress; import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.P2PService; import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.TorNetworkNode;
import javafx.beans.property.DoubleProperty; import javafx.beans.property.DoubleProperty;
import javafx.beans.property.IntegerProperty; import javafx.beans.property.IntegerProperty;
import javafx.beans.property.LongProperty; import javafx.beans.property.LongProperty;
@ -137,7 +140,7 @@ public abstract class Trade implements Tradable, Model {
private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_"; private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_";
private static final long SHUTDOWN_TIMEOUT_MS = 60000; private static final long SHUTDOWN_TIMEOUT_MS = 60000;
private static final long SYNC_EVERY_NUM_BLOCKS = 360; // ~1/2 day private static final long SYNC_EVERY_NUM_BLOCKS = 360; // ~1/2 day
private static final long DELETE_AFTER_NUM_BLOCKS = 1; // if deposit requested but not published private static final long DELETE_AFTER_NUM_BLOCKS = 2; // if deposit requested but not published
private static final long DELETE_AFTER_MS = TradeProtocol.TRADE_STEP_TIMEOUT_SECONDS; private static final long DELETE_AFTER_MS = TradeProtocol.TRADE_STEP_TIMEOUT_SECONDS;
private final Object walletLock = new Object(); private final Object walletLock = new Object();
private final Object pollLock = new Object(); private final Object pollLock = new Object();
@ -651,7 +654,7 @@ public abstract class Trade implements Tradable, Model {
if (!isInitialized || isShutDownStarted) return; if (!isInitialized || isShutDownStarted) return;
ThreadUtils.submitToPool(() -> { ThreadUtils.submitToPool(() -> {
if (newValue == Trade.Phase.DEPOSIT_REQUESTED) startPolling(); if (newValue == Trade.Phase.DEPOSIT_REQUESTED) startPolling();
if (newValue == Trade.Phase.DEPOSITS_PUBLISHED) xmrWalletService.freezeOutputs(getSelf().getReserveTxKeyImages()); if (newValue == Trade.Phase.DEPOSITS_PUBLISHED) onDepositsPublished();
if (isDepositsPublished() && !isPayoutUnlocked()) updatePollPeriod(); if (isDepositsPublished() && !isPayoutUnlocked()) updatePollPeriod();
if (isPaymentReceived()) { if (isPaymentReceived()) {
UserThread.execute(() -> { UserThread.execute(() -> {
@ -956,15 +959,17 @@ public abstract class Trade implements Tradable, Model {
try { try {
// ensure wallet is initialized // ensure wallet is initialized
boolean syncedWallet = false;
if (wallet == null) { if (wallet == null) {
log.warn("Wallet is not initialized for {} {}, opening", getClass().getSimpleName(), getId()); log.warn("Wallet is not initialized for {} {}, opening", getClass().getSimpleName(), getId());
getWallet(); getWallet();
syncWallet(true); syncWallet(true);
syncedWallet = true;
} }
// wallet must be synced // sync wallet if deposit requested and payout not unlocked
if (isDepositRequested() && isWalletBehind()) { if (isDepositRequested() && !isPayoutUnlocked() && !syncedWallet) {
log.warn("Wallet is not synced for {} {}, syncing", getClass().getSimpleName(), getId()); log.warn("Syncing wallet on deletion for trade {} {}, syncing", getClass().getSimpleName(), getId());
syncWallet(true); syncWallet(true);
} }
@ -1407,14 +1412,14 @@ public abstract class Trade implements Tradable, Model {
return; return;
} }
// unreserve taker key images // unreserve taker's key images
if (this instanceof TakerTrade) { if (this instanceof TakerTrade) {
ThreadUtils.submitToPool(() -> { ThreadUtils.submitToPool(() -> {
xmrWalletService.thawOutputs(getSelf().getReserveTxKeyImages()); xmrWalletService.thawOutputs(getSelf().getReserveTxKeyImages());
}); });
} }
// unreserve open offer // unreserve maker's open offer
Optional<OpenOffer> openOffer = processModel.getOpenOfferManager().getOpenOfferById(this.getId()); Optional<OpenOffer> openOffer = processModel.getOpenOfferManager().getOpenOfferById(this.getId());
if (this instanceof MakerTrade && openOffer.isPresent()) { if (this instanceof MakerTrade && openOffer.isPresent()) {
processModel.getOpenOfferManager().unreserveOpenOffer(openOffer.get()); processModel.getOpenOfferManager().unreserveOpenOffer(openOffer.get());
@ -2504,6 +2509,47 @@ public abstract class Trade implements Tradable, Model {
} }
} }
private void onDepositsPublished() {
// skip if arbitrator
if (this instanceof ArbitratorTrade) return;
// freeze outputs until spent
xmrWalletService.freezeOutputs(getSelf().getReserveTxKeyImages());
// close open offer or reset address entries
if (this instanceof MakerTrade) {
processModel.getOpenOfferManager().closeOpenOffer(getOffer());
} else {
getXmrWalletService().resetAddressEntriesForOpenOffer(getId());
}
// seller publishes trade statistics
if (this instanceof SellerTrade) {
checkNotNull(getSeller().getDepositTx());
processModel.getP2PService().findPeersCapabilities(getTradePeer().getNodeAddress())
.filter(capabilities -> capabilities.containsAll(Capability.TRADE_STATISTICS_3))
.ifPresentOrElse(capabilities -> {
// Our peer has updated, so as we are the seller we will publish the trade statistics.
// The peer as buyer does not publish anymore with v.1.4.0 (where Capability.TRADE_STATISTICS_3 was added)
String referralId = processModel.getReferralIdService().getOptionalReferralId().orElse(null);
boolean isTorNetworkNode = getProcessModel().getP2PService().getNetworkNode() instanceof TorNetworkNode;
TradeStatistics3 tradeStatistics = TradeStatistics3.from(this, referralId, isTorNetworkNode);
if (tradeStatistics.isValid()) {
log.info("Publishing trade statistics");
processModel.getP2PService().addPersistableNetworkPayload(tradeStatistics, true);
} else {
log.warn("Trade statistics are invalid. We do not publish. {}", tradeStatistics);
}
},
() -> {
log.info("Our peer does not has updated yet, so they will publish the trade statistics. " +
"To avoid duplicates we do not publish from our side.");
});
}
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER // PROTO BUFFER
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -84,7 +84,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
latchTrade(); latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request); Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request); processModel.setTradeMessage(request);
expect(phase(Trade.Phase.INIT) expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED)
.with(request) .with(request)
.from(sender)) .from(sender))
.setup(tasks( .setup(tasks(
@ -99,8 +99,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
}, },
errorMessage -> { errorMessage -> {
handleTaskRunnerFault(sender, request, errorMessage); handleTaskRunnerFault(sender, request, errorMessage);
})) })))
.withTimeout(TRADE_STEP_TIMEOUT_SECONDS))
.executeTasks(true); .executeTasks(true);
awaitTradeLatch(); awaitTradeLatch();
} }
@ -117,4 +116,13 @@ public class ArbitratorProtocol extends DisputeProtocol {
public Class<? extends TradeTask>[] getDepositsConfirmedTasks() { public Class<? extends TradeTask>[] getDepositsConfirmedTasks() {
return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class }; return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class };
} }
@Override
public void handleError(String errorMessage) {
// set trade state to send deposit responses with nack
if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) {
trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
}
super.handleError(errorMessage);
}
} }

View file

@ -69,8 +69,6 @@ import haveno.core.trade.protocol.tasks.ProcessPaymentReceivedMessage;
import haveno.core.trade.protocol.tasks.ProcessPaymentSentMessage; import haveno.core.trade.protocol.tasks.ProcessPaymentSentMessage;
import haveno.core.trade.protocol.tasks.ProcessSignContractRequest; import haveno.core.trade.protocol.tasks.ProcessSignContractRequest;
import haveno.core.trade.protocol.tasks.SendDepositRequest; import haveno.core.trade.protocol.tasks.SendDepositRequest;
import haveno.core.trade.protocol.tasks.RemoveOffer;
import haveno.core.trade.protocol.tasks.SellerPublishTradeStatistics;
import haveno.core.trade.protocol.tasks.MaybeResendDisputeClosedMessageWithPayout; import haveno.core.trade.protocol.tasks.MaybeResendDisputeClosedMessageWithPayout;
import haveno.core.trade.protocol.tasks.TradeTask; import haveno.core.trade.protocol.tasks.TradeTask;
import haveno.core.trade.protocol.tasks.VerifyPeersAccountAgeWitness; import haveno.core.trade.protocol.tasks.VerifyPeersAccountAgeWitness;
@ -434,13 +432,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
Validator.checkTradeId(processModel.getOfferId(), response); Validator.checkTradeId(processModel.getOfferId(), response);
latchTrade(); latchTrade();
processModel.setTradeMessage(response); processModel.setTradeMessage(response);
expect(anyState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED, Trade.Phase.DEPOSITS_PUBLISHED)
.with(response) .with(response)
.from(sender)) .from(sender))
.setup(tasks( .setup(tasks(
ProcessDepositResponse.class, ProcessDepositResponse.class)
RemoveOffer.class,
SellerPublishTradeStatistics.class)
.using(new TradeTaskRunner(trade, .using(new TradeTaskRunner(trade,
() -> { () -> {
stopTimeout(); stopTimeout();
@ -672,8 +668,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
} }
if (ackMessage.isSuccess()) { if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {} from {} with tradeId {} and uid {}", log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
ackMessage.getSourceMsgClassName(), sender, trade.getId(), ackMessage.getSourceUid());
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time // handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) { if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
@ -689,8 +684,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
else peer.setDepositsConfirmedMessageAcked(true); else peer.setDepositsConfirmedMessageAcked(true);
} }
} else { } else {
String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage(); log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
log.warn(err);
// set trade state on deposit request nack // set trade state on deposit request nack
if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) { if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {

View file

@ -43,7 +43,7 @@ import java.util.UUID;
@Slf4j @Slf4j
public class ArbitratorProcessDepositRequest extends TradeTask { public class ArbitratorProcessDepositRequest extends TradeTask {
private boolean depositTxsRelayed = false; private Throwable error;
@SuppressWarnings({"unused"}) @SuppressWarnings({"unused"})
public ArbitratorProcessDepositRequest(TaskRunner taskHandler, Trade trade) { public ArbitratorProcessDepositRequest(TaskRunner taskHandler, Trade trade) {
@ -52,132 +52,155 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
@Override @Override
protected void run() { protected void run() {
MoneroDaemon daemon = trade.getXmrWalletService().getDaemon();
try { try {
runInterceptHook(); runInterceptHook();
// get contract and signature // check if trade is failed
String contractAsJson = trade.getContractAsJson(); if (trade.getState() == Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED) throw new RuntimeException("Cannot process deposit request because trade is already failed, tradeId=" + trade.getId());
DepositRequest request = (DepositRequest) processModel.getTradeMessage(); // TODO (woodser): verify response
byte[] signature = request.getContractSignature();
// get trader info
TradePeer trader = trade.getTradePeer(processModel.getTempTradePeerNodeAddress());
if (trader == null) throw new RuntimeException(request.getClass().getSimpleName() + " is not from maker, taker, or arbitrator");
PubKeyRing peerPubKeyRing = trader.getPubKeyRing();
// verify signature
if (!HavenoUtils.isSignatureValid(peerPubKeyRing, contractAsJson, signature)) {
throw new RuntimeException("Peer's contract signature is invalid");
}
// set peer's signature
trader.setContractSignature(signature);
// collect expected values
Offer offer = trade.getOffer();
boolean isFromTaker = trader == trade.getTaker();
boolean isFromBuyer = trader == trade.getBuyer();
BigInteger tradeFee = isFromTaker ? trade.getTakerFee() : trade.getMakerFee();
BigInteger sendTradeAmount = isFromBuyer ? BigInteger.ZERO : trade.getAmount();
BigInteger securityDeposit = isFromBuyer ? trade.getBuyerSecurityDepositBeforeMiningFee() : trade.getSellerSecurityDepositBeforeMiningFee();
String depositAddress = processModel.getMultisigAddress();
// verify deposit tx
MoneroTx verifiedTx;
try {
verifiedTx = trade.getXmrWalletService().verifyDepositTx(
offer.getId(),
tradeFee,
trade.getProcessModel().getTradeFeeAddress(),
sendTradeAmount,
securityDeposit,
depositAddress,
trader.getDepositTxHash(),
request.getDepositTxHex(),
request.getDepositTxKey(),
null);
} catch (Exception e) {
throw new RuntimeException("Error processing deposit tx from " + (isFromTaker ? "taker " : "maker ") + trader.getNodeAddress() + ", offerId=" + offer.getId() + ": " + e.getMessage());
}
// extend timeout
if (isTimedOut()) throw new RuntimeException("Trade protocol has timed out while verifying deposit tx for {} {}" + trade.getClass().getSimpleName() + " " + trade.getShortId());
trade.startProtocolTimeout();
// set deposit info
trader.setSecurityDeposit(securityDeposit.subtract(verifiedTx.getFee())); // subtract mining fee from security deposit
trader.setDepositTxFee(verifiedTx.getFee());
trader.setDepositTxHex(request.getDepositTxHex());
trader.setDepositTxKey(request.getDepositTxKey());
if (request.getPaymentAccountKey() != null) trader.setPaymentAccountKey(request.getPaymentAccountKey());
// relay deposit txs when both available
if (processModel.getMaker().getDepositTxHex() != null && processModel.getTaker().getDepositTxHex() != null) {
// update trade state
trade.setState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST);
processModel.getTradeManager().requestPersistence();
// relay txs
MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true);
MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true);
if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult));
if (!takerResult.isGood()) throw new RuntimeException("Error submitting taker deposit tx: " + JsonUtils.serialize(takerResult));
daemon.relayTxsByHash(Arrays.asList(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash()));
depositTxsRelayed = true;
// update trade state
log.info("Arbitrator submitted deposit txs for trade " + trade.getId());
trade.setState(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS);
processModel.getTradeManager().requestPersistence();
// create deposit response
DepositResponse response = new DepositResponse(
trade.getOffer().getId(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
null,
trade.getBuyer().getSecurityDeposit().longValue(),
trade.getSeller().getSecurityDeposit().longValue());
// send deposit response to maker and taker
sendDepositResponse(trade.getMaker().getNodeAddress(), trade.getMaker().getPubKeyRing(), response);
sendDepositResponse(trade.getTaker().getNodeAddress(), trade.getTaker().getPubKeyRing(), response);
} else {
if (processModel.getMaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from maker for trade " + trade.getId());
if (processModel.getTaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from taker for trade " + trade.getId());
}
// update trade state
trade.setStateIfValidTransitionTo(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
// process request
processDepositRequest();
complete(); complete();
} catch (Throwable t) { } catch (Throwable t) {
this.error = t;
// handle error before deposits relayed t.printStackTrace();
if (!depositTxsRelayed) { trade.setState(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
try {
daemon.flushTxPool(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash());
} catch (Exception e) {
e.printStackTrace();
}
// create deposit response with error
DepositResponse response = new DepositResponse(
trade.getOffer().getId(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
t.getMessage(),
trade.getBuyer().getSecurityDeposit().longValue(),
trade.getSeller().getSecurityDeposit().longValue());
// send deposit response to maker and taker
sendDepositResponse(trade.getMaker().getNodeAddress(), trade.getMaker().getPubKeyRing(), response);
sendDepositResponse(trade.getTaker().getNodeAddress(), trade.getTaker().getPubKeyRing(), response);
}
failed(t); failed(t);
} }
processModel.getTradeManager().requestPersistence();
}
private void processDepositRequest() {
// get contract and signature
String contractAsJson = trade.getContractAsJson();
DepositRequest request = (DepositRequest) processModel.getTradeMessage(); // TODO (woodser): verify response
byte[] signature = request.getContractSignature();
// get trader info
TradePeer trader = trade.getTradePeer(processModel.getTempTradePeerNodeAddress());
if (trader == null) throw new RuntimeException(request.getClass().getSimpleName() + " is not from maker, taker, or arbitrator");
PubKeyRing peerPubKeyRing = trader.getPubKeyRing();
// verify signature
if (!HavenoUtils.isSignatureValid(peerPubKeyRing, contractAsJson, signature)) {
throw new RuntimeException("Peer's contract signature is invalid");
}
// set peer's signature
trader.setContractSignature(signature);
// collect expected values
Offer offer = trade.getOffer();
boolean isFromTaker = trader == trade.getTaker();
boolean isFromBuyer = trader == trade.getBuyer();
BigInteger tradeFee = isFromTaker ? trade.getTakerFee() : trade.getMakerFee();
BigInteger sendTradeAmount = isFromBuyer ? BigInteger.ZERO : trade.getAmount();
BigInteger securityDeposit = isFromBuyer ? trade.getBuyerSecurityDepositBeforeMiningFee() : trade.getSellerSecurityDepositBeforeMiningFee();
String depositAddress = processModel.getMultisigAddress();
// verify deposit tx
MoneroTx verifiedTx;
try {
verifiedTx = trade.getXmrWalletService().verifyDepositTx(
offer.getId(),
tradeFee,
trade.getProcessModel().getTradeFeeAddress(),
sendTradeAmount,
securityDeposit,
depositAddress,
trader.getDepositTxHash(),
request.getDepositTxHex(),
request.getDepositTxKey(),
null);
} catch (Exception e) {
throw new RuntimeException("Error processing deposit tx from " + (isFromTaker ? "taker " : "maker ") + trader.getNodeAddress() + ", offerId=" + offer.getId() + ": " + e.getMessage());
}
// update trade state
trader.setSecurityDeposit(securityDeposit.subtract(verifiedTx.getFee())); // subtract mining fee from security deposit
trader.setDepositTxFee(verifiedTx.getFee());
trader.setDepositTxHex(request.getDepositTxHex());
trader.setDepositTxKey(request.getDepositTxKey());
if (request.getPaymentAccountKey() != null) trader.setPaymentAccountKey(request.getPaymentAccountKey());
processModel.getTradeManager().requestPersistence();
// relay deposit txs when both available
MoneroDaemon daemon = trade.getXmrWalletService().getDaemon();
if (processModel.getMaker().getDepositTxHex() != null && processModel.getTaker().getDepositTxHex() != null) {
// check timeout and extend just before relaying
if (isTimedOut()) throw new RuntimeException("Trade protocol has timed out before relaying deposit txs for {} {}" + trade.getClass().getSimpleName() + " " + trade.getShortId());
trade.addInitProgressStep();
try {
// submit txs to pool but do not relay
MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true);
if (!makerResult.isGood()) throw new RuntimeException("Error submitting maker deposit tx: " + JsonUtils.serialize(makerResult));
MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true);
if (!takerResult.isGood()) throw new RuntimeException("Error submitting taker deposit tx: " + JsonUtils.serialize(takerResult));
// relay txs
daemon.relayTxsByHash(Arrays.asList(processModel.getMaker().getDepositTxHash(), processModel.getTaker().getDepositTxHash()));
// update trade state
log.info("Arbitrator published deposit txs for trade " + trade.getId());
trade.setState(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS);
} catch (Exception e) {
// flush txs from pool
try {
daemon.flushTxPool(processModel.getMaker().getDepositTxHash());
} catch (Exception e2) {
e2.printStackTrace();
}
try {
daemon.flushTxPool(processModel.getTaker().getDepositTxHash());
} catch (Exception e2) {
e2.printStackTrace();
}
throw e;
}
} else {
// subscribe to trade state once to send responses with ack or nack
trade.stateProperty().addListener((obs, oldState, newState) -> {
if (newState == Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED) {
sendDepositResponses(error == null ? "Arbitrator failed to publish deposit txs within timeout for trade " + trade.getId() : error.getMessage());
} else if (newState == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) {
sendDepositResponses(null);
}
});
if (processModel.getMaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from maker for trade " + trade.getId());
if (processModel.getTaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from taker for trade " + trade.getId());
}
}
private boolean isTimedOut() {
return !processModel.getTradeManager().hasOpenTrade(trade);
}
private void sendDepositResponses(String errorMessage) {
// create deposit response
DepositResponse response = new DepositResponse(
trade.getOffer().getId(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
errorMessage,
trade.getBuyer().getSecurityDeposit().longValue(),
trade.getSeller().getSecurityDeposit().longValue());
// send deposit response to maker and taker
sendDepositResponse(trade.getMaker().getNodeAddress(), trade.getMaker().getPubKeyRing(), response);
sendDepositResponse(trade.getTaker().getNodeAddress(), trade.getTaker().getPubKeyRing(), response);
} }
private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) { private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) {
@ -191,12 +214,7 @@ public class ArbitratorProcessDepositRequest extends TradeTask {
public void onFault(String errorMessage) { public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", response.getClass().getSimpleName(), nodeAddress, trade.getId(), errorMessage); log.error("Sending {} failed: uid={}; peer={}; error={}", response.getClass().getSimpleName(), nodeAddress, trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + response + "\nerrorMessage=" + errorMessage); appendToErrorMessage("Sending message failed: message=" + response + "\nerrorMessage=" + errorMessage);
failed();
} }
}); });
} }
private boolean isTimedOut() {
return !processModel.getTradeManager().hasOpenTrade(trade);
}
} }

View file

@ -1,66 +0,0 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.MakerTrade;
import haveno.core.trade.Trade;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class RemoveOffer extends TradeTask {
public RemoveOffer(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected void run() {
try {
runInterceptHook();
if (trade instanceof MakerTrade) {
processModel.getOpenOfferManager().closeOpenOffer(checkNotNull(trade.getOffer()));
} else {
trade.getXmrWalletService().resetAddressEntriesForOpenOffer(trade.getId());
}
complete();
} catch (Throwable t) {
failed(t);
}
}
}

View file

@ -1,75 +0,0 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.app.Capability;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.statistics.TradeStatistics3;
import haveno.network.p2p.network.TorNetworkNode;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class SellerPublishTradeStatistics extends TradeTask {
public SellerPublishTradeStatistics(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected void run() {
try {
runInterceptHook();
// skip if not seller
if (!(trade instanceof SellerTrade)) {
complete();
return;
}
checkNotNull(trade.getSeller().getDepositTx());
processModel.getP2PService().findPeersCapabilities(trade.getTradePeer().getNodeAddress())
.filter(capabilities -> capabilities.containsAll(Capability.TRADE_STATISTICS_3))
.ifPresentOrElse(capabilities -> {
// Our peer has updated, so as we are the seller we will publish the trade statistics.
// The peer as buyer does not publish anymore with v.1.4.0 (where Capability.TRADE_STATISTICS_3 was added)
String referralId = processModel.getReferralIdService().getOptionalReferralId().orElse(null);
boolean isTorNetworkNode = model.getProcessModel().getP2PService().getNetworkNode() instanceof TorNetworkNode;
TradeStatistics3 tradeStatistics = TradeStatistics3.from(trade, referralId, isTorNetworkNode);
if (tradeStatistics.isValid()) {
log.info("Publishing trade statistics");
processModel.getP2PService().addPersistableNetworkPayload(tradeStatistics, true);
} else {
log.warn("Trade statistics are invalid. We do not publish. {}", tradeStatistics);
}
complete();
},
() -> {
log.info("Our peer does not has updated yet, so they will publish the trade statistics. " +
"To avoid duplicates we do not publish from our side.");
complete();
});
} catch (Throwable t) {
failed(t);
}
}
}

View file

@ -31,9 +31,7 @@ import haveno.core.trade.protocol.tasks.BuyerSendPaymentSentMessage;
import haveno.core.trade.protocol.tasks.MakerSetLockTime; import haveno.core.trade.protocol.tasks.MakerSetLockTime;
import haveno.core.trade.protocol.tasks.ProcessPaymentReceivedMessage; import haveno.core.trade.protocol.tasks.ProcessPaymentReceivedMessage;
import haveno.core.trade.protocol.tasks.ProcessPaymentSentMessage; import haveno.core.trade.protocol.tasks.ProcessPaymentSentMessage;
import haveno.core.trade.protocol.tasks.RemoveOffer;
import haveno.core.trade.protocol.tasks.SellerPreparePaymentReceivedMessage; import haveno.core.trade.protocol.tasks.SellerPreparePaymentReceivedMessage;
import haveno.core.trade.protocol.tasks.SellerPublishTradeStatistics;
import haveno.core.trade.protocol.tasks.SellerSendPaymentReceivedMessageToBuyer; import haveno.core.trade.protocol.tasks.SellerSendPaymentReceivedMessageToBuyer;
import haveno.core.trade.protocol.tasks.VerifyPeersAccountAgeWitness; import haveno.core.trade.protocol.tasks.VerifyPeersAccountAgeWitness;
import haveno.desktop.common.view.FxmlView; import haveno.desktop.common.view.FxmlView;
@ -85,9 +83,6 @@ public class DebugView extends InitializableView<GridPane, Void> {
ApplyFilter.class, ApplyFilter.class,
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
//SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishTradeStatistics.class,
ProcessPaymentSentMessage.class, ProcessPaymentSentMessage.class,
ApplyFilter.class, ApplyFilter.class,
@ -104,8 +99,6 @@ public class DebugView extends InitializableView<GridPane, Void> {
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
MakerSetLockTime.class, MakerSetLockTime.class,
RemoveOffer.class,
ApplyFilter.class, ApplyFilter.class,
BuyerPreparePaymentSentMessage.class, BuyerPreparePaymentSentMessage.class,
BuyerSendPaymentSentMessage.class, BuyerSendPaymentSentMessage.class,
@ -134,18 +127,11 @@ public class DebugView extends InitializableView<GridPane, Void> {
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
MakerSetLockTime.class, MakerSetLockTime.class,
//SellerAsMakerProcessDepositTxMessage.class,
RemoveOffer.class,
//SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishTradeStatistics.class,
ProcessPaymentSentMessage.class, ProcessPaymentSentMessage.class,
ApplyFilter.class, ApplyFilter.class,
ApplyFilter.class, ApplyFilter.class,
SellerPreparePaymentReceivedMessage.class, SellerPreparePaymentReceivedMessage.class,
//SellerBroadcastPayoutTx.class, // TODO (woodser): removed from main pipeline; debug view?
SellerSendPaymentReceivedMessageToBuyer.class SellerSendPaymentReceivedMessageToBuyer.class
) )
)); ));