re-send deposits confirmed messages until acked

This commit is contained in:
woodser 2023-04-29 07:40:02 -04:00
parent 5c1cfdcff9
commit 1fdb02bd1f
8 changed files with 92 additions and 35 deletions

View file

@ -220,9 +220,9 @@ public class OfferFilterService {
public boolean hasValidArbitrator(Offer offer) {
Arbitrator arbitrator = user.getAcceptedArbitratorByAddress(offer.getOfferPayload().getArbitratorSigner());
if (arbitrator == null) {
if (arbitrator == null && offer.getOfferPayload().getArbitratorSigner() != null) {
List<NodeAddress> arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList());
log.warn("No arbitrator registered with offer's signer. offerId={}. Accepted arbitrators={}", offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
}
return arbitrator != null;
}

View file

@ -658,17 +658,6 @@ public abstract class Trade implements Tradable, Model {
xmrWalletService.addWalletListener(idlePayoutSyncer);
}
// send deposit confirmed message on startup or event
if (isDepositsConfirmed()) {
new Thread(() -> getProtocol().maybeSendDepositsConfirmedMessages()).start();
} else {
EasyBind.subscribe(stateProperty(), state -> {
if (isDepositsConfirmed()) {
new Thread(() -> getProtocol().maybeSendDepositsConfirmedMessages()).start();
}
});
}
// reprocess pending payout messages
this.getProtocol().maybeReprocessPaymentReceivedMessage(false);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);

View file

@ -487,7 +487,6 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
if (getTradeProtocol(trade) != null) return;
initTradeAndProtocol(trade, createTradeProtocol(trade));
requestPersistence();
listenForCleanup(trade);
}
private void initTradeAndProtocol(Trade trade, TradeProtocol tradeProtocol) {

View file

@ -148,10 +148,6 @@ public class ProcessModel implements Model, PersistablePayload {
@Setter
private String multisigAddress;
@Nullable
@Getter
@Setter
private boolean isDepositsConfirmedMessagesDelivered;
@Nullable
@Setter
@Getter
private PaymentSentMessage paymentSentMessage;
@ -207,8 +203,7 @@ public class ProcessModel implements Model, PersistablePayload {
.setFundsNeededForTrade(fundsNeededForTrade)
.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name())
.setBuyerPayoutAmountFromMediation(buyerPayoutAmountFromMediation)
.setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation)
.setDepositsConfirmedMessagesDelivered(isDepositsConfirmedMessagesDelivered);
.setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation);
Optional.ofNullable(maker).ifPresent(e -> builder.setMaker((protobuf.TradePeer) maker.toProtoMessage()));
Optional.ofNullable(taker).ifPresent(e -> builder.setTaker((protobuf.TradePeer) taker.toProtoMessage()));
Optional.ofNullable(arbitrator).ifPresent(e -> builder.setArbitrator((protobuf.TradePeer) arbitrator.toProtoMessage()));
@ -234,7 +229,6 @@ public class ProcessModel implements Model, PersistablePayload {
processModel.setFundsNeededForTrade(proto.getFundsNeededForTrade());
processModel.setBuyerPayoutAmountFromMediation(proto.getBuyerPayoutAmountFromMediation());
processModel.setSellerPayoutAmountFromMediation(proto.getSellerPayoutAmountFromMediation());
processModel.setDepositsConfirmedMessagesDelivered(proto.getDepositsConfirmedMessagesDelivered());
// nullable
processModel.setTakeOfferFeeTxId(ProtoUtil.stringOrNullFromProto(proto.getTakeOfferFeeTxId()));

View file

@ -119,6 +119,9 @@ public final class TradePeer implements PersistablePayload {
private long securityDeposit;
@Nullable
private String updatedMultisigHex;
@Getter
@Setter
boolean depositsConfirmedMessageAcked;
public TradePeer() {
}
@ -163,6 +166,7 @@ public final class TradePeer implements PersistablePayload {
Optional.ofNullable(depositTxKey).ifPresent(e -> builder.setDepositTxKey(depositTxKey));
Optional.ofNullable(securityDeposit).ifPresent(e -> builder.setSecurityDeposit(securityDeposit));
Optional.ofNullable(updatedMultisigHex).ifPresent(e -> builder.setUpdatedMultisigHex(updatedMultisigHex));
builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked);
builder.setCurrentDate(currentDate);
return builder.build();
@ -204,6 +208,7 @@ public final class TradePeer implements PersistablePayload {
tradePeer.setDepositTxKey(ProtoUtil.stringOrNullFromProto(proto.getDepositTxKey()));
tradePeer.setSecurityDeposit(BigInteger.valueOf(proto.getSecurityDeposit()));
tradePeer.setUpdatedMultisigHex(ProtoUtil.stringOrNullFromProto(proto.getUpdatedMultisigHex()));
tradePeer.setDepositsConfirmedMessageAcked(proto.getDepositsConfirmedMessageAcked());
return tradePeer;
}
}

View file

@ -75,7 +75,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class TradeProtocol implements DecryptedDirectMessageListener, DecryptedMailboxListener {
@ -255,6 +254,21 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
// send deposits confirmed message if applicable
maybeSendDepositsConfirmedMessage();
}
private void maybeSendDepositsConfirmedMessage() {
if (trade.isDepositsConfirmed()) {
new Thread(() -> maybeSendDepositsConfirmedMessages()).start();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (trade.isDepositsConfirmed()) {
new Thread(() -> maybeSendDepositsConfirmedMessages()).start();
}
});
}
}
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
@ -617,6 +631,13 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {} from {} with tradeId {} and uid {}",
ackMessage.getSourceMsgClassName(), peer, trade.getId(), ackMessage.getSourceUid());
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
if (trade.getTradePeer(peer) != null) {
trade.getTradePeer(peer).setDepositsConfirmedMessageAcked(true);
}
}
} else {
String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ peer + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage();
log.warn(err);
@ -834,8 +855,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return tradeMessage.getTradeId().equals(trade.getId());
} else if (message instanceof AckMessage) {
AckMessage ackMessage = (AckMessage) message;
return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
ackMessage.getSourceId().equals(trade.getId());
return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE && ackMessage.getSourceId().equals(trade.getId());
} else {
return false;
}
@ -845,22 +865,15 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
if (trade.isShutDownStarted()) return;
synchronized (trade) {
if (!trade.isInitialized()) return; // skip if shutting down
if (trade.getProcessModel().isDepositsConfirmedMessagesDelivered()) return; // skip if already delivered
latchTrade();
expect(new Condition(trade))
.setup(tasks(getDepositsConfirmedTasks())
.using(new TradeTaskRunner(trade,
() -> {
trade.getProcessModel().setDepositsConfirmedMessagesDelivered(true);
handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages");
handleTaskRunnerSuccess(null, null, "maybeSendDepositsConfirmedMessages");
},
(errorMessage) -> {
// retry in 15 minutes
UserThread.runAfter(() -> {
maybeSendDepositsConfirmedMessages();
}, 15, TimeUnit.MINUTES);
handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage);
handleTaskRunnerFault(null, null, "maybeSendDepositsConfirmedMessages", errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();

View file

@ -17,12 +17,17 @@
package haveno.core.trade.protocol.tasks;
import java.util.concurrent.TimeUnit;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.DepositsConfirmedMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.network.p2p.NodeAddress;
import lombok.extern.slf4j.Slf4j;
@ -31,6 +36,11 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTask {
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 10;
private int delayInMin = 10;
private int resendCounter = 0;
private DepositsConfirmedMessage message;
public SendDepositsConfirmedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
@ -41,6 +51,13 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
protected void run() {
try {
runInterceptHook();
// skip if already acked by receiver
if (ackedByReceiver()) {
complete();
return;
}
super.run();
} catch (Throwable t) {
failed(t);
@ -81,7 +98,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
@Override
protected void setStateSent() {
// no additional handling
tryToSendAgainLater();
processModel.getTradeManager().requestPersistence();
}
@Override
@ -98,4 +116,43 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
protected void setStateFault() {
// no additional handling
}
private void cleanup() {
if (timer != null) {
timer.stop();
}
}
private void tryToSendAgainLater() {
// skip if already acked
if (ackedByReceiver()) return;
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the DepositsConfirmedMessage to the peer. We stop trying to send the message.");
return;
}
if (timer != null) {
timer.stop();
}
// first re-send is after 2 minutes, then double the delay each iteration
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = delayInMin * 2;
}
resendCounter++;
}
private boolean ackedByReceiver() {
TradePeer peer = trade.getTradePeer(getReceiverNodeAddress());
return peer.isDepositsConfirmedMessageAcked();
}
}

View file

@ -1530,7 +1530,6 @@ message ProcessModel {
bool use_savings_wallet = 6;
int64 funds_needed_for_trade = 7;
string payment_sent_message_state = 8;
bool deposits_confirmed_messages_delivered = 9;
bytes maker_signature = 10;
TradePeer maker = 11;
TradePeer taker = 12;
@ -1576,6 +1575,7 @@ message TradePeer {
string deposit_tx_key = 1010;
int64 security_deposit = 1011;
string updated_multisig_hex = 1012;
bool deposits_confirmed_message_acked = 1013;
}
///////////////////////////////////////////////////////////////////////////////////////////