process payment confirmation messages off main thread

This commit is contained in:
woodser 2022-08-31 16:31:09 -04:00
parent dbc7ff265a
commit b95c689190
2 changed files with 58 additions and 54 deletions

View file

@ -127,25 +127,27 @@ public abstract class BuyerProtocol extends DisputeProtocol {
protected void handle(PaymentReceivedMessage message, NodeAddress peer) { protected void handle(PaymentReceivedMessage message, NodeAddress peer) {
log.info("BuyerProtocol.handle(PaymentReceivedMessage)"); log.info("BuyerProtocol.handle(PaymentReceivedMessage)");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message); latchTrade();
processModel.setTradeMessage(message); Validator.checkTradeId(processModel.getOfferId(), message);
expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED) processModel.setTradeMessage(message);
.with(message) expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYMENT_RECEIVED)
.from(peer)) .with(message)
.setup(tasks( .from(peer))
BuyerProcessesPaymentReceivedMessage.class) .setup(tasks(
.using(new TradeTaskRunner(trade, BuyerProcessesPaymentReceivedMessage.class)
() -> { .using(new TradeTaskRunner(trade,
handleTaskRunnerSuccess(peer, message); () -> {
}, handleTaskRunnerSuccess(peer, message);
errorMessage -> { },
handleTaskRunnerFault(peer, message, errorMessage); errorMessage -> {
}))) handleTaskRunnerFault(peer, message, errorMessage);
.executeTasks(true); })))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }

View file

@ -77,42 +77,44 @@ public abstract class SellerProtocol extends DisputeProtocol {
protected void handle(PaymentSentMessage message, NodeAddress peer) { protected void handle(PaymentSentMessage message, NodeAddress peer) {
log.info("SellerProtocol.handle(PaymentSentMessage)"); log.info("SellerProtocol.handle(PaymentSentMessage)");
// We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case new Thread(() -> {
// that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
// a mailbox message with PaymentSentMessage. // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
// TODO A better fix would be to add a listener for the wallet sync state and process // a mailbox message with PaymentSentMessage.
// the mailbox msg once wallet is ready and trade state set. // TODO A better fix would be to add a listener for the wallet sync state and process
synchronized (trade) { // the mailbox msg once wallet is ready and trade state set.
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { synchronized (trade) {
log.warn("Ignoring PaymentSentMessage which was already processed"); if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
return; log.warn("Ignoring PaymentSentMessage which was already processed");
return;
}
latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.DEPOSITS_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(
ApplyFilter.class,
SellerProcessesPaymentSentMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
stopTimeout();
handleTaskRunnerFault(peer, message, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
} }
latchTrade(); }).start();
expect(anyPhase(Trade.Phase.DEPOSITS_UNLOCKED, Trade.Phase.DEPOSITS_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(
ApplyFilter.class,
SellerProcessesPaymentSentMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
stopTimeout();
handleTaskRunnerFault(peer, message, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////