From 7062fa9e79ad6148d163d8fc46c526b992b9601d Mon Sep 17 00:00:00 2001 From: woodser Date: Sat, 26 Nov 2022 17:24:55 +0000 Subject: [PATCH] message handlers return synchronously --- .../java/bisq/core/trade/TradeManager.java | 28 +- .../core/trade/protocol/TradeProtocol.java | 380 +++++++++--------- 2 files changed, 199 insertions(+), 209 deletions(-) diff --git a/core/src/main/java/bisq/core/trade/TradeManager.java b/core/src/main/java/bisq/core/trade/TradeManager.java index 62a22644..6ed99926 100644 --- a/core/src/main/java/bisq/core/trade/TradeManager.java +++ b/core/src/main/java/bisq/core/trade/TradeManager.java @@ -227,19 +227,21 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi @Override public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); - if (networkEnvelope instanceof InitTradeRequest) { - handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); - } else if (networkEnvelope instanceof InitMultisigRequest) { - handleInitMultisigRequest((InitMultisigRequest) networkEnvelope, peer); - } else if (networkEnvelope instanceof SignContractRequest) { - handleSignContractRequest((SignContractRequest) networkEnvelope, peer); - } else if (networkEnvelope instanceof SignContractResponse) { - handleSignContractResponse((SignContractResponse) networkEnvelope, peer); - } else if (networkEnvelope instanceof DepositRequest) { - handleDepositRequest((DepositRequest) networkEnvelope, peer); - } else if (networkEnvelope instanceof DepositResponse) { - handleDepositResponse((DepositResponse) networkEnvelope, peer); - } + new Thread(() -> { + if (networkEnvelope instanceof InitTradeRequest) { + handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); + } else if (networkEnvelope instanceof InitMultisigRequest) { + handleInitMultisigRequest((InitMultisigRequest) networkEnvelope, peer); + } else if (networkEnvelope instanceof SignContractRequest) { + handleSignContractRequest((SignContractRequest) networkEnvelope, peer); + } else if (networkEnvelope instanceof SignContractResponse) { + handleSignContractResponse((SignContractResponse) networkEnvelope, peer); + } else if (networkEnvelope instanceof DepositRequest) { + handleDepositRequest((DepositRequest) networkEnvelope, peer); + } else if (networkEnvelope instanceof DepositResponse) { + handleDepositResponse((DepositResponse) networkEnvelope, peer); + } + }).start(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 1e744041..83645693 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -104,24 +104,24 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); - if (message instanceof DepositsConfirmedMessage) { - handle((DepositsConfirmedMessage) message, peerNodeAddress); - } else if (message instanceof PaymentSentMessage) { - handle((PaymentSentMessage) message, peerNodeAddress); - } else if (message instanceof PaymentReceivedMessage) { - handle((PaymentReceivedMessage) message, peerNodeAddress); - } + handle(message, peerNodeAddress); } protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); - if (message instanceof DepositsConfirmedMessage) { - handle((DepositsConfirmedMessage) message, peerNodeAddress); - } else if (message instanceof PaymentSentMessage) { - handle((PaymentSentMessage) message, peerNodeAddress); - } else if (message instanceof PaymentReceivedMessage) { - handle((PaymentReceivedMessage) message, peerNodeAddress); - } + handle(message, peerNodeAddress); + } + + private void handle(TradeMessage message, NodeAddress peerNodeAddress) { + new Thread(() -> { + if (message instanceof DepositsConfirmedMessage) { + handle((DepositsConfirmedMessage) message, peerNodeAddress); + } else if (message instanceof PaymentSentMessage) { + handle((PaymentSentMessage) message, peerNodeAddress); + } else if (message instanceof PaymentReceivedMessage) { + handle((PaymentReceivedMessage) message, peerNodeAddress); + } + }).start(); } @Override @@ -242,7 +242,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D // handle trade events EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN) sendDepositsConfirmedMessage(); + if (state == Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN) { + new Thread(() -> sendDepositsConfirmedMessage()).start(); + } }); // initialize trade @@ -256,155 +258,145 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()"); - new Thread(() -> { - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), request); - processModel.setTradeMessage(request); - expect(anyPhase(Trade.Phase.INIT) - .with(request) - .from(sender)) - .setup(tasks( - ProcessInitMultisigRequest.class, - MaybeSendSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } - }).start(); + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), request); + processModel.setTradeMessage(request); + expect(anyPhase(Trade.Phase.INIT) + .with(request) + .from(sender)) + .setup(tasks( + ProcessInitMultisigRequest.class, + MaybeSendSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } } public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId()); - new Thread(() -> { - synchronized (trade) { + synchronized (trade) { + Validator.checkTradeId(processModel.getOfferId(), message); + if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { + latchTrade(); Validator.checkTradeId(processModel.getOfferId(), message); - if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - // process sign contract request after multisig created - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock - }); - } + processModel.setTradeMessage(message); + expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + // process sign contract request after multisig created + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock + }); } - }).start(); + } } public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId()); - new Thread(() -> { - synchronized (trade) { + synchronized (trade) { + Validator.checkTradeId(processModel.getOfferId(), message); + if (trade.getState() == Trade.State.CONTRACT_SIGNED) { + latchTrade(); Validator.checkTradeId(processModel.getOfferId(), message); - if (trade.getState() == Trade.State.CONTRACT_SIGNED) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(state(Trade.State.CONTRACT_SIGNED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractResponse.class, - RemoveOffer.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - // process sign contract response after contract signed - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock - }); - } + processModel.setTradeMessage(message); + expect(state(Trade.State.CONTRACT_SIGNED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractResponse.class, + RemoveOffer.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + // process sign contract response after contract signed + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock + }); } - }).start(); + } } public void handleDepositResponse(DepositResponse response, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handleDepositResponse()"); - new Thread(() -> { - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), response); - processModel.setTradeMessage(response); - expect(anyState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) - .with(response) - .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() - .setup(tasks( - // TODO (woodser): validate request - ProcessDepositResponse.class) - .using(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - this.errorMessageHandler = null; - handleTaskRunnerSuccess(sender, response); - if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized - }, - errorMessage -> { - handleTaskRunnerFault(sender, response, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } - }).start(); + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), response); + processModel.setTradeMessage(response); + expect(anyState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) + .with(response) + .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() + .setup(tasks( + // TODO (woodser): validate request + ProcessDepositResponse.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + this.errorMessageHandler = null; + handleTaskRunnerSuccess(sender, response); + if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized + }, + errorMessage -> { + handleTaskRunnerFault(sender, response, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } } public void handle(DepositsConfirmedMessage response, NodeAddress sender) { System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); - new Thread(() -> { - synchronized (trade) { - latchTrade(); - expect(new Condition(trade) - .with(response) - .from(sender)) - .setup(tasks(ProcessDepositsConfirmedMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(sender, response); - }, - errorMessage -> { - handleTaskRunnerFault(sender, response, errorMessage); - }))) - .executeTasks(); - awaitTradeLatch(); - } - }).start(); + synchronized (trade) { + latchTrade(); + expect(new Condition(trade) + .with(response) + .from(sender)) + .setup(tasks(ProcessDepositsConfirmedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(sender, response); + }, + errorMessage -> { + handleTaskRunnerFault(sender, response, errorMessage); + }))) + .executeTasks(); + awaitTradeLatch(); + } } // received by seller and arbitrator @@ -414,43 +406,41 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D log.warn("Ignoring PaymentSentMessage since not seller or arbitrator"); return; } - new Thread(() -> { - // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case - // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received - // a mailbox message with PaymentSentMessage. - // TODO A better fix would be to add a listener for the wallet sync state and process - // the mailbox msg once wallet is ready and trade state set. - synchronized (trade) { - if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Ignoring PaymentSentMessage which was already processed"); - return; - } - latchTrade(); - expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) - .with(message) - .from(peer) - .preCondition(trade.getPayoutTx() == null, - () -> { - log.warn("We received a PaymentSentMessage but we have already created the payout tx " + - "so we ignore the message. This can happen if the ACK message to the peer did not " + - "arrive and the peer repeats sending us the message. We send another ACK msg."); - sendAckMessage(peer, message, true, null); - removeMailboxMessageAfterProcessing(message); - })) - .setup(tasks( - ApplyFilter.class, - ProcessPaymentSentMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - (errorMessage) -> { - handleTaskRunnerFault(peer, message, errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); + // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case + // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received + // a mailbox message with PaymentSentMessage. + // TODO A better fix would be to add a listener for the wallet sync state and process + // the mailbox msg once wallet is ready and trade state set. + synchronized (trade) { + if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Ignoring PaymentSentMessage which was already processed"); + return; } - }).start(); + latchTrade(); + expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) + .with(message) + .from(peer) + .preCondition(trade.getPayoutTx() == null, + () -> { + log.warn("We received a PaymentSentMessage but we have already created the payout tx " + + "so we ignore the message. This can happen if the ACK message to the peer did not " + + "arrive and the peer repeats sending us the message. We send another ACK msg."); + sendAckMessage(peer, message, true, null); + removeMailboxMessageAfterProcessing(message); + })) + .setup(tasks( + ApplyFilter.class, + ProcessPaymentSentMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + (errorMessage) -> { + handleTaskRunnerFault(peer, message, errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); + } } // received by buyer and arbitrator @@ -761,21 +751,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D } private void sendDepositsConfirmedMessage() { - new Thread(() -> { - synchronized (trade) { - latchTrade(); - expect(new Condition(trade)) - .setup(tasks(getDepositsConfirmedTasks()) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages"); - }, - (errorMessage) -> { - handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } - }).start(); + synchronized (trade) { + latchTrade(); + expect(new Condition(trade)) + .setup(tasks(getDepositsConfirmedTasks()) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages"); + }, + (errorMessage) -> { + handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); + } } }