mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-03-29 18:48:57 +00:00
refactor message resending, reprocessing, and ack handling
This commit is contained in:
parent
63917fe8cc
commit
3cde880b1c
17 changed files with 419 additions and 243 deletions
core/src/main/java/haveno/core/trade
Trade.java
protocol
ProcessModel.javaSellerProtocol.javaTradePeer.javaTradeProtocol.java
tasks
BuyerSendPaymentSentMessage.javaBuyerSendPaymentSentMessageToArbitrator.javaBuyerSendPaymentSentMessageToSeller.javaSellerSendPaymentReceivedMessage.javaSellerSendPaymentReceivedMessageToBuyer.javaSendDepositsConfirmedMessage.javaSendDepositsConfirmedMessageToArbitrator.javaSendDepositsConfirmedMessageToBuyer.javaSendDepositsConfirmedMessageToSeller.java
desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades
proto/src/main/proto
|
@ -46,7 +46,6 @@ import haveno.common.taskrunner.Model;
|
|||
import haveno.common.util.Utilities;
|
||||
import haveno.core.monetary.Price;
|
||||
import haveno.core.monetary.Volume;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.offer.Offer;
|
||||
import haveno.core.offer.OfferDirection;
|
||||
import haveno.core.offer.OpenOffer;
|
||||
|
@ -195,7 +194,8 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
|
||||
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
|
||||
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
|
||||
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
|
||||
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
|
||||
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
|
||||
|
||||
@NotNull
|
||||
public Phase getPhase() {
|
||||
|
@ -603,12 +603,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
}
|
||||
}
|
||||
|
||||
// notified from TradeProtocol of ack messages
|
||||
public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
|
||||
// notified from TradeProtocol of ack messages
|
||||
public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
|
||||
for (TradeListener listener : new ArrayList<TradeListener>(tradeListeners)) { // copy array to allow listener invocation to unregister listener without concurrent modification exception
|
||||
listener.onAckMessage(ackMessage, sender);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -618,8 +618,9 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
public void initialize(ProcessModelServiceProvider serviceProvider) {
|
||||
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
|
||||
|
||||
// done if payout unlocked and marked complete
|
||||
if (isPayoutUnlocked() && isCompleted()) {
|
||||
// skip initialization if trade is complete
|
||||
// starting in v1.0.19, seller resends payment received message until acked or stored in mailbox
|
||||
if (isPayoutUnlocked() && isCompleted() && !getProtocol().needsToResendPaymentReceivedMessages()) {
|
||||
clearAndShutDown();
|
||||
return;
|
||||
}
|
||||
|
@ -733,15 +734,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
xmrWalletService.addWalletListener(idlePayoutSyncer);
|
||||
}
|
||||
|
||||
// TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed?
|
||||
if (isBuyer()) {
|
||||
MessageState expectedState = getPaymentSentMessageState();
|
||||
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) {
|
||||
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get());
|
||||
processModel.getPaymentSentMessageStatePropertySeller().set(expectedState);
|
||||
}
|
||||
}
|
||||
|
||||
// handle confirmations
|
||||
walletHeight.addListener((observable, oldValue, newValue) -> {
|
||||
importMultisigHexIfScheduled();
|
||||
|
@ -771,11 +763,20 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
}
|
||||
}
|
||||
|
||||
// start polling if deposit requested
|
||||
if (isDepositRequested()) tryInitPolling();
|
||||
// init syncing if deposit requested
|
||||
if (isDepositRequested()) {
|
||||
tryInitSyncing();
|
||||
}
|
||||
isFullyInitialized = true;
|
||||
}
|
||||
|
||||
public void reprocessApplicableMessages() {
|
||||
if (!isDepositRequested() || isPayoutUnlocked() || isCompleted()) return;
|
||||
getProtocol().maybeReprocessPaymentSentMessage(false);
|
||||
getProtocol().maybeReprocessPaymentReceivedMessage(false);
|
||||
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
|
||||
}
|
||||
|
||||
public void awaitInitialized() {
|
||||
while (!isFullyInitialized) HavenoUtils.waitFor(100); // TODO: use proper notification and refactor isInitialized, fullyInitialized, and arbitrator idling
|
||||
}
|
||||
|
@ -1535,7 +1536,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
peer.setUpdatedMultisigHex(null);
|
||||
peer.setDisputeClosedMessage(null);
|
||||
peer.setPaymentSentMessage(null);
|
||||
peer.setPaymentReceivedMessage(null);
|
||||
if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2049,25 +2050,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
throw new IllegalArgumentException("Trade is not buyer, seller, or arbitrator");
|
||||
}
|
||||
|
||||
public MessageState getPaymentSentMessageState() {
|
||||
if (isPaymentReceived()) return MessageState.ACKNOWLEDGED;
|
||||
if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED;
|
||||
switch (state) {
|
||||
case BUYER_SENT_PAYMENT_SENT_MSG:
|
||||
return MessageState.SENT;
|
||||
case BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG:
|
||||
return MessageState.ARRIVED;
|
||||
case BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG:
|
||||
return MessageState.STORED_IN_MAILBOX;
|
||||
case SELLER_RECEIVED_PAYMENT_SENT_MSG:
|
||||
return MessageState.ACKNOWLEDGED;
|
||||
case BUYER_SEND_FAILED_PAYMENT_SENT_MSG:
|
||||
return MessageState.FAILED;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPeerRole(TradePeer peer) {
|
||||
if (peer == getBuyer()) return "Buyer";
|
||||
if (peer == getSeller()) return "Seller";
|
||||
|
@ -2444,11 +2426,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
|
||||
// sync and reprocess messages on new thread
|
||||
if (isInitialized && connection != null && !Boolean.FALSE.equals(xmrConnectionService.isConnected())) {
|
||||
ThreadUtils.execute(() -> tryInitPolling(), getId());
|
||||
ThreadUtils.execute(() -> tryInitSyncing(), getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
private void tryInitPolling() {
|
||||
|
||||
private void tryInitSyncing() {
|
||||
if (isShutDownStarted) return;
|
||||
|
||||
// set known deposit txs
|
||||
|
@ -2457,24 +2440,18 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
|
||||
// start polling
|
||||
if (!isIdling()) {
|
||||
tryInitPollingAux();
|
||||
doTryInitSyncing();
|
||||
} else {
|
||||
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling
|
||||
UserThread.runAfter(() -> ThreadUtils.execute(() -> {
|
||||
if (!isShutDownStarted) tryInitPollingAux();
|
||||
if (!isShutDownStarted) doTryInitSyncing();
|
||||
}, getId()), startSyncingInMs / 1000l);
|
||||
}
|
||||
}
|
||||
|
||||
private void tryInitPollingAux() {
|
||||
private void doTryInitSyncing() {
|
||||
if (!wasWalletSynced) trySyncWallet(true);
|
||||
updatePollPeriod();
|
||||
|
||||
// reprocess pending messages
|
||||
getProtocol().maybeReprocessPaymentSentMessage(false);
|
||||
getProtocol().maybeReprocessPaymentReceivedMessage(false);
|
||||
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
|
||||
|
||||
startPolling();
|
||||
}
|
||||
|
||||
|
@ -2825,7 +2802,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
|
|||
if (!isShutDownStarted) wallet = getWallet();
|
||||
restartInProgress = false;
|
||||
pollWallet();
|
||||
if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId());
|
||||
if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitSyncing(), getId());
|
||||
}
|
||||
|
||||
private void setStateDepositsSeen() {
|
||||
|
|
|
@ -44,6 +44,7 @@ import haveno.core.account.witness.AccountAgeWitnessService;
|
|||
import haveno.core.filter.FilterManager;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.offer.Offer;
|
||||
import haveno.core.offer.OfferDirection;
|
||||
import haveno.core.offer.OpenOfferManager;
|
||||
import haveno.core.payment.PaymentAccount;
|
||||
import haveno.core.payment.payload.PaymentAccountPayload;
|
||||
|
@ -73,6 +74,9 @@ import org.bitcoinj.core.Coin;
|
|||
import org.bitcoinj.core.Transaction;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
// Fields marked as transient are only used during protocol execution which are based on directMessages so we do not
|
||||
|
@ -161,15 +165,11 @@ public class ProcessModel implements Model, PersistablePayload {
|
|||
@Getter
|
||||
@Setter
|
||||
private boolean importMultisigHexScheduled;
|
||||
|
||||
// We want to indicate the user the state of the message delivery of the
|
||||
// PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet.
|
||||
// To enable that even after restart we persist the state.
|
||||
@Setter
|
||||
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
@Setter
|
||||
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
|
||||
@Deprecated
|
||||
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
@Deprecated
|
||||
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
|
||||
public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) {
|
||||
this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer());
|
||||
|
@ -191,6 +191,31 @@ public class ProcessModel implements Model, PersistablePayload {
|
|||
this.offer = offer;
|
||||
this.provider = provider;
|
||||
this.tradeManager = tradeManager;
|
||||
for (TradePeer peer : getTradePeers()) {
|
||||
peer.applyTransient(tradeManager);
|
||||
}
|
||||
|
||||
// migrate deprecated fields to new model for v1.0.19
|
||||
if (paymentSentMessageStatePropertySeller.get() != MessageState.UNDEFINED && getSeller().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
|
||||
getSeller().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertySeller.get());
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
if (paymentSentMessageStatePropertyArbitrator.get() != MessageState.UNDEFINED && getArbitrator().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
|
||||
getArbitrator().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertyArbitrator.get());
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TradePeer> getTradePeers() {
|
||||
return Arrays.asList(maker, taker, arbitrator);
|
||||
}
|
||||
|
||||
private TradePeer getBuyer() {
|
||||
return offer.getDirection() == OfferDirection.BUY ? maker : taker;
|
||||
}
|
||||
|
||||
private TradePeer getSeller() {
|
||||
return offer.getDirection() == OfferDirection.BUY ? taker : maker;
|
||||
}
|
||||
|
||||
|
||||
|
@ -245,14 +270,13 @@ public class ProcessModel implements Model, PersistablePayload {
|
|||
processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress()));
|
||||
processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress()));
|
||||
|
||||
// deprecated fields need to be read in order to migrate to new fields
|
||||
String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller());
|
||||
MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString);
|
||||
processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller);
|
||||
|
||||
processModel.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateSeller);
|
||||
String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator());
|
||||
MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString);
|
||||
processModel.setPaymentSentMessageStateArbitrator(paymentSentMessageStateArbitrator);
|
||||
|
||||
processModel.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateArbitrator);
|
||||
return processModel;
|
||||
}
|
||||
|
||||
|
@ -279,40 +303,8 @@ public class ProcessModel implements Model, PersistablePayload {
|
|||
return getP2PService().getAddress();
|
||||
}
|
||||
|
||||
void setPaymentSentAckMessageSeller(AckMessage ackMessage) {
|
||||
MessageState messageState = ackMessage.isSuccess() ?
|
||||
MessageState.ACKNOWLEDGED :
|
||||
MessageState.FAILED;
|
||||
setPaymentSentMessageStateSeller(messageState);
|
||||
}
|
||||
|
||||
void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
|
||||
MessageState messageState = ackMessage.isSuccess() ?
|
||||
MessageState.ACKNOWLEDGED :
|
||||
MessageState.FAILED;
|
||||
setPaymentSentMessageStateArbitrator(messageState);
|
||||
}
|
||||
|
||||
public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) {
|
||||
this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty);
|
||||
if (tradeManager != null) {
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessageStateProperty) {
|
||||
this.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateProperty);
|
||||
if (tradeManager != null) {
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isPaymentSentMessageAckedBySeller() {
|
||||
return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED;
|
||||
}
|
||||
|
||||
public boolean isPaymentSentMessageAckedByArbitrator() {
|
||||
return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
|
||||
public boolean isPaymentReceivedMessagesReceived() {
|
||||
return getArbitrator().isPaymentReceivedMessageReceived() && getBuyer().isPaymentReceivedMessageReceived();
|
||||
}
|
||||
|
||||
void setDepositTxSentAckMessage(AckMessage ackMessage) {
|
||||
|
|
|
@ -53,6 +53,9 @@ import lombok.extern.slf4j.Slf4j;
|
|||
|
||||
@Slf4j
|
||||
public class SellerProtocol extends DisputeProtocol {
|
||||
|
||||
private static final long RESEND_PAYMENT_RECEIVED_MSGS_AFTER = 1741629525730L; // Mar 10 2025 17:58 UTC
|
||||
|
||||
enum SellerEvent implements FluentProtocol.Event {
|
||||
STARTUP,
|
||||
DEPOSIT_TXS_CONFIRMED,
|
||||
|
@ -69,31 +72,37 @@ public class SellerProtocol extends DisputeProtocol {
|
|||
|
||||
// re-send payment received message if payout not published
|
||||
ThreadUtils.execute(() -> {
|
||||
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
|
||||
if (!needsToResendPaymentReceivedMessages()) return;
|
||||
synchronized (trade.getLock()) {
|
||||
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
|
||||
if (trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.isPayoutPublished()) {
|
||||
latchTrade();
|
||||
given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
|
||||
.with(SellerEvent.STARTUP))
|
||||
.setup(tasks(
|
||||
SellerSendPaymentReceivedMessageToBuyer.class,
|
||||
SellerSendPaymentReceivedMessageToArbitrator.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
unlatchTrade();
|
||||
},
|
||||
(errorMessage) -> {
|
||||
log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage);
|
||||
unlatchTrade();
|
||||
})))
|
||||
.executeTasks();
|
||||
awaitTradeLatch();
|
||||
}
|
||||
if (!needsToResendPaymentReceivedMessages()) return;
|
||||
latchTrade();
|
||||
given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
|
||||
.with(SellerEvent.STARTUP))
|
||||
.setup(tasks(
|
||||
SellerSendPaymentReceivedMessageToBuyer.class,
|
||||
SellerSendPaymentReceivedMessageToArbitrator.class)
|
||||
.using(new TradeTaskRunner(trade,
|
||||
() -> {
|
||||
unlatchTrade();
|
||||
},
|
||||
(errorMessage) -> {
|
||||
log.warn("Error sending PaymentReceivedMessage on startup: " + errorMessage);
|
||||
unlatchTrade();
|
||||
})))
|
||||
.executeTasks();
|
||||
awaitTradeLatch();
|
||||
}
|
||||
}, trade.getId());
|
||||
}
|
||||
|
||||
public boolean needsToResendPaymentReceivedMessages() {
|
||||
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
|
||||
}
|
||||
|
||||
private boolean resendPaymentReceivedMessagesEnabled() {
|
||||
return trade.getTakeOfferDate().getTime() > RESEND_PAYMENT_RECEIVED_MSGS_AFTER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
|
|
@ -24,12 +24,17 @@ import haveno.common.crypto.PubKeyRing;
|
|||
import haveno.common.proto.ProtoUtil;
|
||||
import haveno.common.proto.persistable.PersistablePayload;
|
||||
import haveno.core.account.witness.AccountAgeWitness;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.payment.payload.PaymentAccountPayload;
|
||||
import haveno.core.proto.CoreProtoResolver;
|
||||
import haveno.core.support.dispute.messages.DisputeClosedMessage;
|
||||
import haveno.core.trade.TradeManager;
|
||||
import haveno.core.trade.messages.PaymentReceivedMessage;
|
||||
import haveno.core.trade.messages.PaymentSentMessage;
|
||||
import haveno.network.p2p.AckMessage;
|
||||
import haveno.network.p2p.NodeAddress;
|
||||
import javafx.beans.property.ObjectProperty;
|
||||
import javafx.beans.property.SimpleObjectProperty;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -57,6 +62,7 @@ public final class TradePeer implements PersistablePayload {
|
|||
@Nullable
|
||||
transient private byte[] preparedDepositTx;
|
||||
transient private MoneroTxWallet depositTx;
|
||||
transient private TradeManager tradeManager;
|
||||
|
||||
// Persistable mutable
|
||||
@Nullable
|
||||
|
@ -96,7 +102,6 @@ public final class TradePeer implements PersistablePayload {
|
|||
@Getter
|
||||
private DisputeClosedMessage disputeClosedMessage;
|
||||
|
||||
|
||||
// added in v 0.6
|
||||
@Nullable
|
||||
private byte[] accountAgeWitnessNonce;
|
||||
|
@ -142,13 +147,32 @@ public final class TradePeer implements PersistablePayload {
|
|||
private long payoutAmount;
|
||||
@Nullable
|
||||
private String updatedMultisigHex;
|
||||
@Getter
|
||||
@Deprecated
|
||||
private boolean depositsConfirmedMessageAcked;
|
||||
|
||||
// We want to indicate the user the state of the message delivery of the payment
|
||||
// confirmation messages. We do an automatic re-send in case it was not ACKed yet.
|
||||
// To enable that even after restart we persist the state.
|
||||
@Setter
|
||||
boolean depositsConfirmedMessageAcked;
|
||||
private ObjectProperty<MessageState> depositsConfirmedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
@Setter
|
||||
private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
@Setter
|
||||
private ObjectProperty<MessageState> paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
|
||||
|
||||
public TradePeer() {
|
||||
}
|
||||
|
||||
public void applyTransient(TradeManager tradeManager) {
|
||||
this.tradeManager = tradeManager;
|
||||
|
||||
// migrate deprecated fields to new model for v1.0.19
|
||||
if (depositsConfirmedMessageAcked && depositsConfirmedMessageStateProperty.get() == MessageState.UNDEFINED) {
|
||||
depositsConfirmedMessageStateProperty.set(MessageState.ACKNOWLEDGED);
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public BigInteger getDepositTxFee() {
|
||||
return BigInteger.valueOf(depositTxFee);
|
||||
}
|
||||
|
@ -181,6 +205,60 @@ public final class TradePeer implements PersistablePayload {
|
|||
this.payoutAmount = payoutAmount.longValueExact();
|
||||
}
|
||||
|
||||
void setDepositsConfirmedAckMessage(AckMessage ackMessage) {
|
||||
MessageState messageState = ackMessage.isSuccess() ?
|
||||
MessageState.ACKNOWLEDGED :
|
||||
MessageState.FAILED;
|
||||
setDepositsConfirmedMessageState(messageState);
|
||||
}
|
||||
|
||||
void setPaymentSentAckMessage(AckMessage ackMessage) {
|
||||
MessageState messageState = ackMessage.isSuccess() ?
|
||||
MessageState.ACKNOWLEDGED :
|
||||
MessageState.FAILED;
|
||||
setPaymentSentMessageState(messageState);
|
||||
}
|
||||
|
||||
void setPaymentReceivedAckMessage(AckMessage ackMessage) {
|
||||
MessageState messageState = ackMessage.isSuccess() ?
|
||||
MessageState.ACKNOWLEDGED :
|
||||
MessageState.FAILED;
|
||||
setPaymentReceivedMessageState(messageState);
|
||||
}
|
||||
|
||||
public void setDepositsConfirmedMessageState(MessageState depositsConfirmedMessageStateProperty) {
|
||||
this.depositsConfirmedMessageStateProperty.set(depositsConfirmedMessageStateProperty);
|
||||
if (tradeManager != null) {
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) {
|
||||
this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty);
|
||||
if (tradeManager != null) {
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public void setPaymentReceivedMessageState(MessageState paymentReceivedMessageStateProperty) {
|
||||
this.paymentReceivedMessageStateProperty.set(paymentReceivedMessageStateProperty);
|
||||
if (tradeManager != null) {
|
||||
tradeManager.requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isDepositsConfirmedMessageAcked() {
|
||||
return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
|
||||
}
|
||||
|
||||
public boolean isPaymentSentMessageAcked() {
|
||||
return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
|
||||
}
|
||||
|
||||
public boolean isPaymentReceivedMessageReceived() {
|
||||
return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message toProtoMessage() {
|
||||
final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder();
|
||||
|
@ -221,6 +299,9 @@ public final class TradePeer implements PersistablePayload {
|
|||
Optional.ofNullable(payoutTxFee).ifPresent(e -> builder.setPayoutTxFee(payoutTxFee));
|
||||
Optional.ofNullable(payoutAmount).ifPresent(e -> builder.setPayoutAmount(payoutAmount));
|
||||
builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked);
|
||||
builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name());
|
||||
builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name());
|
||||
builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name());
|
||||
|
||||
builder.setCurrentDate(currentDate);
|
||||
return builder.build();
|
||||
|
@ -270,6 +351,19 @@ public final class TradePeer implements PersistablePayload {
|
|||
tradePeer.setUnsignedPayoutTxHex(ProtoUtil.stringOrNullFromProto(proto.getUnsignedPayoutTxHex()));
|
||||
tradePeer.setPayoutTxFee(BigInteger.valueOf(proto.getPayoutTxFee()));
|
||||
tradePeer.setPayoutAmount(BigInteger.valueOf(proto.getPayoutAmount()));
|
||||
|
||||
String depositsConfirmedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDepositsConfirmedMessageState());
|
||||
MessageState depositsConfirmedMessageState = ProtoUtil.enumFromProto(MessageState.class, depositsConfirmedMessageStateString);
|
||||
tradePeer.setDepositsConfirmedMessageState(depositsConfirmedMessageState);
|
||||
|
||||
String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState());
|
||||
MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString);
|
||||
tradePeer.setPaymentSentMessageState(paymentSentMessageState);
|
||||
|
||||
String paymentReceivedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageState());
|
||||
MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString);
|
||||
tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState);
|
||||
|
||||
return tradePeer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,6 +252,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
|||
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
|
||||
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
|
||||
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
|
||||
|
||||
// reprocess applicable messages
|
||||
trade.reprocessApplicableMessages();
|
||||
}
|
||||
|
||||
// send deposits confirmed message if applicable
|
||||
|
@ -281,6 +284,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
|||
}, trade.getId());
|
||||
}
|
||||
|
||||
public boolean needsToResendPaymentReceivedMessages() {
|
||||
return false; // seller protocol overrides
|
||||
}
|
||||
|
||||
public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) {
|
||||
if (trade.isShutDownStarted()) return;
|
||||
ThreadUtils.execute(() -> {
|
||||
|
@ -291,7 +298,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
|||
return;
|
||||
}
|
||||
|
||||
log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId());
|
||||
log.warn("Reprocessing PaymentSentMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
|
||||
handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError);
|
||||
}
|
||||
}, trade.getId());
|
||||
|
@ -307,7 +314,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
|||
return;
|
||||
}
|
||||
|
||||
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
|
||||
log.warn("Reprocessing PaymentReceivedMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
|
||||
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
|
||||
}
|
||||
}, trade.getId());
|
||||
|
@ -710,47 +717,76 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
|
|||
|
||||
private void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
|
||||
|
||||
// handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
|
||||
if (trade.getTradePeer(sender) == trade.getSeller()) {
|
||||
processModel.setPaymentSentAckMessageSeller(ackMessage);
|
||||
trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
|
||||
processModel.setPaymentSentAckMessageArbitrator(ackMessage);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else if (!ackMessage.isSuccess()) {
|
||||
String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage();
|
||||
log.warn(err);
|
||||
return; // log error and ignore nack if not seller
|
||||
}
|
||||
// get trade peer
|
||||
TradePeer peer = trade.getTradePeer(sender);
|
||||
if (peer == null) {
|
||||
if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator();
|
||||
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker();
|
||||
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker();
|
||||
}
|
||||
if (peer == null) {
|
||||
if (ackMessage.isSuccess()) log.warn("Received AckMessage from unknown peer for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
|
||||
else log.warn("Received AckMessage with error state from unknown peer for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if (ackMessage.isSuccess()) {
|
||||
log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
|
||||
// update sender's node address
|
||||
if (!peer.getNodeAddress().equals(sender)) {
|
||||
log.info("Updating peer's node address from {} to {} using ACK message to {}", peer.getNodeAddress(), sender, ackMessage.getSourceMsgClassName());
|
||||
peer.setNodeAddress(sender);
|
||||
}
|
||||
|
||||
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
|
||||
TradePeer peer = trade.getTradePeer(sender);
|
||||
if (peer == null) {
|
||||
|
||||
// get the applicable peer based on the sourceUid
|
||||
if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator();
|
||||
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker();
|
||||
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker();
|
||||
}
|
||||
if (peer == null) log.warn("Received AckMesage for DepositsConfirmedMessage for unknown peer: " + sender);
|
||||
else peer.setDepositsConfirmedMessageAcked(true);
|
||||
}
|
||||
} else {
|
||||
log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
|
||||
|
||||
// set trade state on deposit request nack
|
||||
if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {
|
||||
// set trade state on deposit request nack
|
||||
if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {
|
||||
if (!ackMessage.isSuccess()) {
|
||||
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
}
|
||||
|
||||
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
|
||||
peer.setDepositsConfirmedAckMessage(ackMessage);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
// handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
|
||||
if (trade.getTradePeer(sender) == trade.getSeller()) {
|
||||
trade.getSeller().setPaymentSentAckMessage(ackMessage);
|
||||
if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
|
||||
else trade.setState(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
|
||||
trade.getArbitrator().setPaymentSentAckMessage(ackMessage);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else {
|
||||
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// handle ack for PaymentReceivedMessage, which automatically re-sends if not ACKed in a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(PaymentReceivedMessage.class.getSimpleName())) {
|
||||
if (trade.getTradePeer(sender) == trade.getBuyer()) {
|
||||
trade.getBuyer().setPaymentReceivedAckMessage(ackMessage);
|
||||
if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG);
|
||||
else trade.setState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
|
||||
trade.getArbitrator().setPaymentReceivedAckMessage(ackMessage);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
} else {
|
||||
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// generic handling
|
||||
if (ackMessage.isSuccess()) {
|
||||
log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
|
||||
} else {
|
||||
log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
|
||||
handleError(ackMessage.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,26 +142,26 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
|
|||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
|
||||
getReceiver().setPaymentSentMessageState(MessageState.SENT);
|
||||
tryToSendAgainLater();
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
|
||||
getReceiver().setPaymentSentMessageState(MessageState.ARRIVED);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
|
||||
getReceiver().setPaymentSentMessageState(MessageState.STORED_IN_MAILBOX);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
|
||||
getReceiver().setPaymentSentMessageState(MessageState.FAILED);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
|
@ -170,7 +170,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
|
|||
timer.stop();
|
||||
}
|
||||
if (listener != null) {
|
||||
processModel.getPaymentSentMessageStatePropertySeller().removeListener(listener);
|
||||
trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,7 +185,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
|
|||
return;
|
||||
}
|
||||
|
||||
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
|
||||
if (timer != null) {
|
||||
timer.stop();
|
||||
}
|
||||
|
@ -194,8 +193,8 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
|
|||
|
||||
if (resendCounter == 0) {
|
||||
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
|
||||
processModel.getPaymentSentMessageStatePropertySeller().addListener(listener);
|
||||
onMessageStateChange(processModel.getPaymentSentMessageStatePropertySeller().get());
|
||||
getReceiver().getPaymentSentMessageStateProperty().addListener(listener);
|
||||
onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get());
|
||||
}
|
||||
|
||||
// first re-send is after 2 minutes, then increase the delay exponentially
|
||||
|
@ -212,12 +211,12 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
|
|||
}
|
||||
|
||||
private void onMessageStateChange(MessageState newValue) {
|
||||
if (newValue == MessageState.ACKNOWLEDGED) {
|
||||
trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
if (isAckedByReceiver()) {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract boolean isAckedByReceiver();
|
||||
protected boolean isAckedByReceiver() {
|
||||
return getReceiver().isPaymentSentMessageAcked();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,26 +38,7 @@ public class BuyerSendPaymentSentMessageToArbitrator extends BuyerSendPaymentSen
|
|||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
super.setStateSent();
|
||||
complete(); // don't wait for message to arbitrator
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
// state only updated on seller message
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
// state only updated on seller message
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
// state only updated on seller message
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAckedByReceiver() {
|
||||
return trade.getProcessModel().isPaymentSentMessageAckedByArbitrator();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package haveno.core.trade.protocol.tasks;
|
||||
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.core.trade.messages.TradeMessage;
|
||||
import haveno.core.trade.protocol.TradePeer;
|
||||
|
@ -40,25 +39,25 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
|
|||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.SENT);
|
||||
if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
|
||||
super.setStateSent();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.ARRIVED);
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
|
||||
super.setStateArrived();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.STORED_IN_MAILBOX);
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
|
||||
super.setStateStoredInMailbox();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.FAILED);
|
||||
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
|
||||
super.setStateFault();
|
||||
}
|
||||
|
||||
|
@ -69,9 +68,4 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
|
|||
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAckedByReceiver() {
|
||||
return trade.getProcessModel().isPaymentSentMessageAckedBySeller();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,11 +35,15 @@
|
|||
package haveno.core.trade.protocol.tasks;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
|
||||
import haveno.common.Timer;
|
||||
import haveno.common.UserThread;
|
||||
import haveno.common.crypto.PubKeyRing;
|
||||
import haveno.common.crypto.Sig;
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.account.sign.SignedWitness;
|
||||
import haveno.core.account.witness.AccountAgeWitnessService;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.trade.HavenoUtils;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.core.trade.messages.PaymentReceivedMessage;
|
||||
|
@ -47,15 +51,23 @@ import haveno.core.trade.messages.TradeMailboxMessage;
|
|||
import haveno.core.trade.protocol.TradePeer;
|
||||
import haveno.core.util.JsonUtil;
|
||||
import haveno.network.p2p.NodeAddress;
|
||||
import javafx.beans.value.ChangeListener;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask {
|
||||
SignedWitness signedWitness = null;
|
||||
private SignedWitness signedWitness = null;
|
||||
private ChangeListener<MessageState> listener;
|
||||
private Timer timer;
|
||||
private static final int MAX_RESEND_ATTEMPTS = 20;
|
||||
private int delayInMin = 10;
|
||||
private int resendCounter = 0;
|
||||
|
||||
public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
|
||||
super(taskHandler, trade);
|
||||
|
@ -77,6 +89,13 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
|
|||
protected void run() {
|
||||
try {
|
||||
runInterceptHook();
|
||||
|
||||
// skip if already received
|
||||
if (isReceived()) {
|
||||
if (!isCompleted()) complete();
|
||||
return;
|
||||
}
|
||||
|
||||
super.run();
|
||||
} catch (Throwable t) {
|
||||
failed(t);
|
||||
|
@ -134,29 +153,85 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
|
|||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
|
||||
log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
|
||||
getReceiver().setPaymentReceivedMessageState(MessageState.SENT);
|
||||
tryToSendAgainLater();
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
|
||||
log.error("{} failed: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
|
||||
getReceiver().setPaymentReceivedMessageState(MessageState.FAILED);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
|
||||
log.info("{} stored in mailbox: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
|
||||
getReceiver().setPaymentReceivedMessageState(MessageState.STORED_IN_MAILBOX);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
|
||||
log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
|
||||
getReceiver().setPaymentReceivedMessageState(MessageState.ARRIVED);
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
if (timer != null) {
|
||||
timer.stop();
|
||||
}
|
||||
if (listener != null) {
|
||||
trade.getBuyer().getPaymentReceivedMessageStateProperty().removeListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void tryToSendAgainLater() {
|
||||
|
||||
// skip if already received
|
||||
if (isReceived()) return;
|
||||
|
||||
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
|
||||
cleanup();
|
||||
log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (timer != null) {
|
||||
timer.stop();
|
||||
}
|
||||
|
||||
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
|
||||
|
||||
if (resendCounter == 0) {
|
||||
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
|
||||
getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener);
|
||||
onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get());
|
||||
}
|
||||
|
||||
// first re-send is after 2 minutes, then increase the delay exponentially
|
||||
if (resendCounter == 0) {
|
||||
int shortDelay = 2;
|
||||
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
|
||||
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
|
||||
} else {
|
||||
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
|
||||
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
|
||||
delayInMin = (int) ((double) delayInMin * 1.5);
|
||||
}
|
||||
resendCounter++;
|
||||
}
|
||||
|
||||
private void onMessageStateChange(MessageState newValue) {
|
||||
if (isReceived()) {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isReceived() {
|
||||
return getReceiver().isPaymentReceivedMessageReceived();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,30 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe
|
|||
return trade.getBuyer();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
|
||||
super.setStateSent();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
|
||||
super.setStateFault();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
|
||||
super.setStateStoredInMailbox();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
|
||||
super.setStateArrived();
|
||||
}
|
||||
|
||||
// continue execution on fault so payment received message is sent to arbitrator
|
||||
@Override
|
||||
protected void onFault(String errorMessage, TradeMessage message) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import haveno.common.Timer;
|
|||
import haveno.common.UserThread;
|
||||
import haveno.common.crypto.PubKeyRing;
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.network.MessageState;
|
||||
import haveno.core.trade.HavenoUtils;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.core.trade.messages.DepositsConfirmedMessage;
|
||||
|
@ -52,8 +53,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
|
|||
try {
|
||||
runInterceptHook();
|
||||
|
||||
// skip if already acked by receiver
|
||||
if (isAckedByReceiver()) {
|
||||
// skip if already acked or payout published
|
||||
if (isAckedByReceiver() || trade.isPayoutPublished()) {
|
||||
if (!isCompleted()) complete();
|
||||
return;
|
||||
}
|
||||
|
@ -64,11 +65,17 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected abstract NodeAddress getReceiverNodeAddress();
|
||||
protected abstract TradePeer getReceiver();
|
||||
|
||||
@Override
|
||||
protected abstract PubKeyRing getReceiverPubKeyRing();
|
||||
protected NodeAddress getReceiverNodeAddress() {
|
||||
return getReceiver().getNodeAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PubKeyRing getReceiverPubKeyRing() {
|
||||
return getReceiver().getPubKeyRing();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
|
||||
|
@ -97,23 +104,24 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
|
|||
|
||||
@Override
|
||||
protected void setStateSent() {
|
||||
getReceiver().setDepositsConfirmedMessageState(MessageState.SENT);
|
||||
tryToSendAgainLater();
|
||||
processModel.getTradeManager().requestPersistence();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateArrived() {
|
||||
// no additional handling
|
||||
getReceiver().setDepositsConfirmedMessageState(MessageState.ARRIVED);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateStoredInMailbox() {
|
||||
// no additional handling
|
||||
getReceiver().setDepositsConfirmedMessageState(MessageState.STORED_IN_MAILBOX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setStateFault() {
|
||||
// no additional handling
|
||||
getReceiver().setDepositsConfirmedMessageState(MessageState.FAILED);
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
|
@ -151,7 +159,6 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
|
|||
}
|
||||
|
||||
private boolean isAckedByReceiver() {
|
||||
TradePeer peer = trade.getTradePeer(getReceiverNodeAddress());
|
||||
return peer.isDepositsConfirmedMessageAcked();
|
||||
return getReceiver().isDepositsConfirmedMessageAcked();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
|
||||
package haveno.core.trade.protocol.tasks;
|
||||
|
||||
import haveno.common.crypto.PubKeyRing;
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.network.p2p.NodeAddress;
|
||||
import haveno.core.trade.protocol.TradePeer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToArbitrator extends SendDepositsConfir
|
|||
}
|
||||
|
||||
@Override
|
||||
public NodeAddress getReceiverNodeAddress() {
|
||||
return trade.getArbitrator().getNodeAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PubKeyRing getReceiverPubKeyRing() {
|
||||
return trade.getArbitrator().getPubKeyRing();
|
||||
protected TradePeer getReceiver() {
|
||||
return trade.getArbitrator();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
|
||||
package haveno.core.trade.protocol.tasks;
|
||||
|
||||
import haveno.common.crypto.PubKeyRing;
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.network.p2p.NodeAddress;
|
||||
import haveno.core.trade.protocol.TradePeer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToBuyer extends SendDepositsConfirmedMe
|
|||
}
|
||||
|
||||
@Override
|
||||
public NodeAddress getReceiverNodeAddress() {
|
||||
return trade.getBuyer().getNodeAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PubKeyRing getReceiverPubKeyRing() {
|
||||
return trade.getBuyer().getPubKeyRing();
|
||||
protected TradePeer getReceiver() {
|
||||
return trade.getBuyer();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
|
||||
package haveno.core.trade.protocol.tasks;
|
||||
|
||||
import haveno.common.crypto.PubKeyRing;
|
||||
import haveno.common.taskrunner.TaskRunner;
|
||||
import haveno.core.trade.Trade;
|
||||
import haveno.network.p2p.NodeAddress;
|
||||
import haveno.core.trade.protocol.TradePeer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToSeller extends SendDepositsConfirmedM
|
|||
}
|
||||
|
||||
@Override
|
||||
public NodeAddress getReceiverNodeAddress() {
|
||||
return trade.getSeller().getNodeAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PubKeyRing getReceiverPubKeyRing() {
|
||||
return trade.getSeller().getPubKeyRing();
|
||||
protected TradePeer getReceiver() {
|
||||
return trade.getSeller();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -200,7 +200,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
|
|||
payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> {
|
||||
onPayoutStateChanged(state);
|
||||
});
|
||||
messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStatePropertySeller(), this::onPaymentSentMessageStateChanged);
|
||||
messageStateSubscription = EasyBind.subscribe(trade.getSeller().getPaymentSentMessageStateProperty(), this::onPaymentSentMessageStateChanged);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -425,6 +425,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
|
|||
case SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG:
|
||||
case SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG:
|
||||
case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
|
||||
case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
|
||||
sellerState.set(trade.isPayoutPublished() ? SellerState.STEP4 : SellerState.STEP3);
|
||||
break;
|
||||
|
||||
|
|
|
@ -135,6 +135,7 @@ public class SellerStep3View extends TradeStepView {
|
|||
statusLabel.setText(Res.get("shared.messageStoredInMailbox"));
|
||||
break;
|
||||
case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
|
||||
case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
|
||||
busyAnimation.stop();
|
||||
statusLabel.setText(Res.get("shared.messageArrived"));
|
||||
break;
|
||||
|
|
|
@ -1465,6 +1465,7 @@ message Trade {
|
|||
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24;
|
||||
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25;
|
||||
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26;
|
||||
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG = 27;
|
||||
}
|
||||
|
||||
enum Phase {
|
||||
|
@ -1568,8 +1569,8 @@ message ProcessModel {
|
|||
bytes payout_tx_signature = 4;
|
||||
bool use_savings_wallet = 5;
|
||||
int64 funds_needed_for_trade = 6;
|
||||
string payment_sent_message_state_seller = 7;
|
||||
string payment_sent_message_state_arbitrator = 8;
|
||||
string payment_sent_message_state_seller = 7 [deprecated = true];
|
||||
string payment_sent_message_state_arbitrator = 8 [deprecated = true];
|
||||
bytes maker_signature = 9;
|
||||
TradePeer maker = 10;
|
||||
TradePeer taker = 11;
|
||||
|
@ -1613,7 +1614,7 @@ message TradePeer {
|
|||
string made_multisig_hex = 31;
|
||||
string exchanged_multisig_hex = 32;
|
||||
string updated_multisig_hex = 33;
|
||||
bool deposits_confirmed_message_acked = 34;
|
||||
bool deposits_confirmed_message_acked = 34 [deprecated = true];
|
||||
string deposit_tx_hash = 35;
|
||||
string deposit_tx_hex = 36;
|
||||
string deposit_tx_key = 37;
|
||||
|
@ -1622,6 +1623,9 @@ message TradePeer {
|
|||
string unsigned_payout_tx_hex = 40;
|
||||
int64 payout_tx_fee = 41;
|
||||
int64 payout_amount = 42;
|
||||
string deposits_confirmed_message_state = 43;
|
||||
string payment_sent_message_state = 44;
|
||||
string payment_received_message_state = 45;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in a new issue