fix issues going offline while completing trades

This commit is contained in:
woodser 2022-08-15 10:08:41 -04:00
parent 12e3e3507e
commit 2f1f1a788b
9 changed files with 93 additions and 135 deletions

View file

@ -39,6 +39,7 @@ import bisq.core.util.ParsingUtils;
import bisq.core.util.VolumeUtil;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.common.UserThread;
import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.ProtoUtil;
@ -647,6 +648,13 @@ public abstract class Trade implements Tradable, Model {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void setMyNodeAddress() {
if (this instanceof MakerTrade) makerNodeAddress = P2PService.getMyNodeAddress();
else if (this instanceof TakerTrade) takerNodeAddress = P2PService.getMyNodeAddress();
else if (this instanceof ArbitratorTrade) arbitratorNodeAddress = P2PService.getMyNodeAddress();
else throw new RuntimeException("Must be maker, taker, or arbitrator to set own address");
}
public void setTradingPeerNodeAddress(NodeAddress peerAddress) {
if (this instanceof MakerTrade) takerNodeAddress = peerAddress;
else if (this instanceof TakerTrade) makerNodeAddress = peerAddress;

View file

@ -52,7 +52,9 @@ public abstract class BuyerProtocol extends DisputeProtocol {
@Override
protected void onInitialized() {
super.onInitialized();
// TODO: run with trade lock and latch, otherwise getting invalid transition warnings on startup after offline trades
given(phase(Trade.Phase.DEPOSITS_PUBLISHED)
.with(BuyerEvent.STARTUP))
.setup(tasks(SetupDepositTxsListener.class))
@ -90,25 +92,30 @@ public abstract class BuyerProtocol extends DisputeProtocol {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
BuyerEvent event = BuyerEvent.PAYMENT_SENT;
expect(phase(Trade.Phase.DEPOSITS_UNLOCKED)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(ApplyFilter.class,
//UpdateMultisigWithTradingPeer.class, // TODO (woodser): can use this to test protocol with updated multisig from peer. peer should attempt to send updated multisig hex earlier as part of protocol. cannot use with countdown latch because response comes back in a separate thread and blocks on trade
BuyerPreparesPaymentSentMessage.class,
//BuyerSetupPayoutTxListener.class,
BuyerSendsPaymentSentMessage.class) // don't latch trade because this blocks and runs in background
.using(new TradeTaskRunner(trade,
() -> {
this.errorMessageHandler = null;
handleTaskRunnerSuccess(event);
resultHandler.handleResult();
},
(errorMessage) -> {
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT))
.executeTasks(true);
try {
expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.PAYMENT_SENT)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(ApplyFilter.class,
//UpdateMultisigWithTradingPeer.class, // TODO (woodser): can use this to test protocol with updated multisig from peer. peer should attempt to send updated multisig hex earlier as part of protocol. cannot use with countdown latch because response comes back in a separate thread and blocks on trade
BuyerPreparesPaymentSentMessage.class,
//BuyerSetupPayoutTxListener.class,
BuyerSendsPaymentSentMessage.class) // don't latch trade because this blocks and runs in background
.using(new TradeTaskRunner(trade,
() -> {
this.errorMessageHandler = null;
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_PAYMENT_SENT))
.executeTasks(true);
} catch (Exception e) {
errorMessageHandler.handleErrorMessage("Error confirming payment sent: " + e.getMessage());
unlatchTrade();
}
awaitTradeLatch();
}
}).start();

View file

@ -288,11 +288,12 @@ public class FluentProtocol {
event.name() + " event" :
"";
if (isPhaseValid) {
String info = MessageFormat.format("We received a {0} at phase {1} and state {2}, tradeId={3}",
String info = MessageFormat.format("We received a {0} at phase {1} and state {2}, tradeId={3}, peer={4}",
trigger,
trade.getPhase(),
trade.getState(),
trade.getId());
trade.getId(),
this.peer);
log.info(info);
return Result.VALID.info(info);
} else {

View file

@ -48,6 +48,8 @@ public abstract class SellerProtocol extends DisputeProtocol {
protected void onInitialized() {
super.onInitialized();
// TODO: run with trade lock and latch, otherwise getting invalid transition warnings on startup after offline trades
given(phase(Trade.Phase.DEPOSITS_PUBLISHED)
.with(BuyerEvent.STARTUP))
.setup(tasks(SetupDepositTxsListener.class))
@ -124,22 +126,27 @@ public abstract class SellerProtocol extends DisputeProtocol {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(
ApplyFilter.class,
SellerPreparesPaymentReceivedMessage.class,
SellerSendsPaymentReceivedMessage.class)
.using(new TradeTaskRunner(trade, () -> {
this.errorMessageHandler = null;
handleTaskRunnerSuccess(event);
resultHandler.handleResult();
}, (errorMessage) -> {
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_PAYMENT_RECEIPT))
.executeTasks(true);
try {
expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(
ApplyFilter.class,
SellerPreparesPaymentReceivedMessage.class,
SellerSendsPaymentReceivedMessage.class)
.using(new TradeTaskRunner(trade, () -> {
this.errorMessageHandler = null;
handleTaskRunnerSuccess(event);
resultHandler.handleResult();
}, (errorMessage) -> {
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_PAYMENT_RECEIPT))
.executeTasks(true);
} catch (Exception e) {
errorMessageHandler.handleErrorMessage("Error confirming payment received: " + e.getMessage());
unlatchTrade();
}
awaitTradeLatch();
}
}).start();

View file

@ -250,7 +250,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId());
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest() " + trade.getId());
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
@ -566,11 +566,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
///////////////////////////////////////////////////////////////////////////////////////////
private PubKeyRing getPeersPubKeyRing(NodeAddress peer) {
trade.setMyNodeAddress(); // TODO: this is a hack to update my node address before verifying the message
if (peer.equals(trade.getArbitratorNodeAddress())) return trade.getArbitratorPubKeyRing();
else if (peer.equals(trade.getMakerNodeAddress())) return trade.getMakerPubKeyRing();
else if (peer.equals(trade.getTakerNodeAddress())) return trade.getTakerPubKeyRing();
else {
log.error("Cannot get peer's pub key ring because peer is not maker, taker, or arbitrator");
log.warn("Cannot get peer's pub key ring because peer is not maker, taker, or arbitrator. Their address might have changed: " + peer);
return null;
}
}
@ -582,16 +583,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
private boolean isPubKeyValid(DecryptedMessageWithPubKey message, NodeAddress sender) {
// We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer
// Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already.
PubKeyRing peersPubKeyRing = getPeersPubKeyRing(sender);
boolean isValid = true; // TODO (woodser): this returns valid=true even if peer's pub key ring is null?
if (peersPubKeyRing != null &&
!message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
isValid = false;
log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer.");
}
return isValid;
// not invalid if pub key rings are unknown
if (trade.getTradingPeer().getPubKeyRing() == null && trade.getArbitratorPubKeyRing() == null) return true;
// valid if peer's pub key ring
if (trade.getTradingPeer().getPubKeyRing() != null && message.getSignaturePubKey().equals(trade.getTradingPeer().getPubKeyRing().getSignaturePubKey())) return true;
// valid if arbitrator's pub key ring
if (trade.getArbitratorPubKeyRing() != null && message.getSignaturePubKey().equals(trade.getArbitratorPubKeyRing().getSignaturePubKey())) return true;
// invalid
log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer and arbitrator.");
return false;
}
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -123,7 +123,8 @@ public class ArbitratorProcessesDepositRequest extends TradeTask {
sendDepositResponse(trade.getMakerNodeAddress(), trade.getMakerPubKeyRing(), response);
sendDepositResponse(trade.getTakerNodeAddress(), trade.getTakerPubKeyRing(), response);
} else {
log.info("Arbitrator waiting for deposit request from maker and taker for trade " + trade.getId());
if (processModel.getMaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from maker for trade " + trade.getId());
if (processModel.getTaker().getDepositTxHex() == null) log.info("Arbitrator waiting for deposit request from taker for trade " + trade.getId());
}
// TODO (woodser): request persistence?

View file

@ -23,15 +23,11 @@ import bisq.core.network.MessageState;
import bisq.core.trade.Trade;
import bisq.core.trade.messages.PaymentSentMessage;
import bisq.core.trade.messages.TradeMailboxMessage;
import bisq.core.trade.messages.TradeMessage;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.taskrunner.TaskRunner;
import javafx.beans.value.ChangeListener;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
@ -45,9 +41,6 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask {
private static final int MAX_RESEND_ATTEMPTS = 10;
private int delayInMin = 15;
private int resendCounter = 0;
private PaymentSentMessage message;
private ChangeListener<MessageState> listener;
private Timer timer;
@ -89,46 +82,24 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask {
if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
}
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateArrived() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
// the message has arrived but we're ultimately waiting for an AckMessage response
if (!trade.isPayoutPublished()) {
tryToSendAgainLater();
}
}
// We override the default behaviour for onStoredInMailbox and do not call complete
@Override
protected void onStoredInMailbox() {
setStateStoredInMailbox();
}
@Override
protected void setStateStoredInMailbox() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
if (!trade.isPayoutPublished()) {
tryToSendAgainLater();
}
processModel.getTradeManager().requestPersistence();
}
// We override the default behaviour for onFault and do not call appendToErrorMessage and failed
@Override
protected void onFault(String errorMessage, TradeMessage message) {
setStateFault();
// TODO: schedule repeat sending like bisq?
}
@Override
protected void setStateFault() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
if (!trade.isPayoutPublished()) {
tryToSendAgainLater();
}
processModel.getTradeManager().requestPersistence();
}
@ -136,7 +107,6 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask {
protected void run() {
try {
runInterceptHook();
super.run();
} catch (Throwable t) {
failed(t);
@ -145,13 +115,6 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask {
}
}
// complete() is called from base class SendMailboxMessageTask=>onArrived()
// We override the default behaviour for complete and keep this task open until receipt of the AckMessage
@Override
protected void complete() {
onMessageStateChange(processModel.getPaymentStartedMessageStateProperty().get()); // check for AckMessage
}
private void cleanup() {
if (timer != null) {
timer.stop();
@ -160,43 +123,4 @@ public class BuyerSendsPaymentSentMessage extends SendMailboxMessageTask {
processModel.getPaymentStartedMessageStateProperty().removeListener(listener);
}
}
private void tryToSendAgainLater() {
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the PaymentSentMessage to the peer. " +
"We stop now and complete the protocol task.");
complete();
return;
}
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
if (resendCounter == 0) {
// We want to register listener only once
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
processModel.getPaymentStartedMessageStateProperty().addListener(listener);
onMessageStateChange(processModel.getPaymentStartedMessageStateProperty().get());
}
delayInMin = delayInMin * 2;
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
// Once we receive an ACK from our msg we know the peer has received the msg and we stop.
if (newValue == MessageState.ACKNOWLEDGED) {
// We treat a ACK like BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
processModel.getTradeManager().requestPersistence();
cleanup();
super.complete(); // received AckMessage, complete this task
}
}
}

View file

@ -119,12 +119,12 @@ public class ProcessInitMultisigRequest extends TradeTask {
// import exchanged multisig keys if applicable
if (processModel.getMultisigAddress() == null && peers[0].getExchangedMultisigHex() != null && peers[1].getExchangedMultisigHex() != null) {
log.info("Importing exchanged multisig hex for trade {}", trade.getId());
MoneroMultisigInitResult result = multisigWallet.exchangeMultisigKeys(Arrays.asList(peers[0].getExchangedMultisigHex(), peers[1].getExchangedMultisigHex()), xmrWalletService.getWalletPassword());
processModel.setMultisigAddress(result.getAddress());
trade.setStateIfValidTransitionTo(Trade.State.MULTISIG_COMPLETED);
processModel.getProvider().getXmrWalletService().closeMultisigWallet(trade.getId()); // save and close multisig wallet once it's created
}
log.info("Importing exchanged multisig hex for trade {}", trade.getId());
MoneroMultisigInitResult result = multisigWallet.exchangeMultisigKeys(Arrays.asList(peers[0].getExchangedMultisigHex(), peers[1].getExchangedMultisigHex()), xmrWalletService.getWalletPassword());
processModel.setMultisigAddress(result.getAddress());
processModel.getProvider().getXmrWalletService().closeMultisigWallet(trade.getId()); // save and close multisig wallet once it's created
trade.setStateIfValidTransitionTo(Trade.State.MULTISIG_COMPLETED);
}
// update multisig participants if new state to communicate
if (updateParticipants) {

View file

@ -381,7 +381,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
decryptedDirectMessageListeners.forEach(e -> {
try {
e.onDirectMessage(decryptedMsg, nodeAddress);
} catch (Exception e2) {
e2.printStackTrace();
}
}),
() -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");