refactor trade protocol latch and timeouts

This commit is contained in:
woodser 2022-07-24 18:36:48 -04:00
parent 3f25a756ea
commit 3dcaf67edd
22 changed files with 346 additions and 368 deletions

View file

@ -186,20 +186,17 @@ public class XmrWalletService {
public MoneroWallet createMultisigWallet(String tradeId) {
log.info("{}.createMultisigWallet({})", getClass().getSimpleName(), tradeId);
Trade trade = tradeManager.getOpenTrade(tradeId).get();
synchronized (trade) {
if (multisigWallets.containsKey(trade.getId())) return multisigWallets.get(trade.getId());
String path = MONERO_MULTISIG_WALLET_PREFIX + trade.getId();
MoneroWallet multisigWallet = createWallet(new MoneroWalletConfig().setPath(path).setPassword(getWalletPassword()), null); // auto-assign port
MoneroWallet multisigWallet = createWallet(new MoneroWalletConfig().setPath(path).setPassword(getWalletPassword()), null, false); // auto-assign port
multisigWallets.put(trade.getId(), multisigWallet);
return multisigWallet;
}
}
// TODO (woodser): provide progress notifications during open?
public MoneroWallet getMultisigWallet(String tradeId) {
log.info("{}.getMultisigWallet({})", getClass().getSimpleName(), tradeId);
Trade trade = tradeManager.getTrade(tradeId);
synchronized (trade) {
if (multisigWallets.containsKey(trade.getId())) return multisigWallets.get(trade.getId());
String path = MONERO_MULTISIG_WALLET_PREFIX + trade.getId();
if (!walletExists(path)) throw new RuntimeException("Multisig wallet does not exist for trade " + trade.getId());
@ -207,7 +204,6 @@ public class XmrWalletService {
multisigWallets.put(trade.getId(), multisigWallet);
return multisigWallet;
}
}
public void saveWallet(MoneroWallet wallet) {
wallet.save();
@ -216,25 +212,19 @@ public class XmrWalletService {
public void closeMultisigWallet(String tradeId) {
log.info("{}.closeMultisigWallet({})", getClass().getSimpleName(), tradeId);
Trade trade = tradeManager.getTrade(tradeId);
synchronized (trade) {
if (!multisigWallets.containsKey(trade.getId())) throw new RuntimeException("Multisig wallet to close was not previously opened for trade " + trade.getId());
MoneroWallet wallet = multisigWallets.remove(trade.getId());
if (!multisigWallets.containsKey(tradeId)) throw new RuntimeException("Multisig wallet to close was not previously opened for trade " + tradeId);
MoneroWallet wallet = multisigWallets.remove(tradeId);
closeWallet(wallet, true);
}
}
public boolean deleteMultisigWallet(String tradeId) {
log.info("{}.deleteMultisigWallet({})", getClass().getSimpleName(), tradeId);
Trade trade = tradeManager.getTrade(tradeId);
synchronized (trade) {
String walletName = MONERO_MULTISIG_WALLET_PREFIX + tradeId;
if (!walletExists(walletName)) return false;
if (multisigWallets.containsKey(trade.getId())) closeMultisigWallet(tradeId);
if (multisigWallets.containsKey(tradeId)) closeMultisigWallet(tradeId); // TODO: synchronize
deleteWallet(walletName);
return true;
}
}
public MoneroTxWallet createTx(List<MoneroDestination> destinations) {
try {
@ -407,7 +397,7 @@ public class XmrWalletService {
if (MoneroUtils.walletExists(xmrWalletFile.getPath())) {
wallet = openWallet(walletConfig, rpcBindPort);
} else if (connectionsService.getConnection() != null && Boolean.TRUE.equals(connectionsService.getConnection().isConnected())) {
wallet = createWallet(walletConfig, rpcBindPort);
wallet = createWallet(walletConfig, rpcBindPort, true);
}
// wallet is not initialized until connected to a daemon
@ -437,10 +427,10 @@ public class XmrWalletService {
}
}
private MoneroWalletRpc createWallet(MoneroWalletConfig config, Integer port) {
private MoneroWalletRpc createWallet(MoneroWalletConfig config, Integer port, boolean sync) {
// start monero-wallet-rpc instance
MoneroWalletRpc walletRpc = startWalletRpcInstance(port);
MoneroWalletRpc walletRpc = startWalletRpcInstance(port, sync);
// must be connected to daemon
MoneroRpcConnection connection = connectionsService.getConnection();
@ -449,9 +439,17 @@ public class XmrWalletService {
// create wallet
try {
log.info("Creating wallet " + config.getPath());
MoneroRpcConnection daemonConnection = config.getServer();
if (!sync) config.setServer(null);
walletRpc.createWallet(config);
if (sync) {
log.info("Syncing wallet " + config.getPath() + " in background");
walletRpc.startSyncing(connectionsService.getDefaultRefreshPeriodMs());
log.info("Done starting background wallet sync for " + config.getPath());
} else {
walletRpc.setDaemonConnection(daemonConnection);
}
log.info("Done creating wallet " + config.getPath());
return walletRpc;
} catch (Exception e) {
e.printStackTrace();
@ -463,7 +461,7 @@ public class XmrWalletService {
private MoneroWalletRpc openWallet(MoneroWalletConfig config, Integer port) {
// start monero-wallet-rpc instance
MoneroWalletRpc walletRpc = startWalletRpcInstance(port);
MoneroWalletRpc walletRpc = startWalletRpcInstance(port, true);
// open wallet
try {
@ -474,10 +472,6 @@ public class XmrWalletService {
// start syncing wallet in background
log.info("Syncing wallet " + config.getPath() + " in background");
walletRpc.startSyncing(connectionsService.getDefaultRefreshPeriodMs());
// sync wallet (blocks)
log.info("Syncing wallet " + config.getPath());
walletRpc.sync(); // TODO: does this initiate 2 syncs back-to-back?
log.info("Done syncing wallet " + config.getPath());
return walletRpc;
} catch (Exception e) {
@ -487,7 +481,7 @@ public class XmrWalletService {
}
}
private MoneroWalletRpc startWalletRpcInstance(Integer port) {
private MoneroWalletRpc startWalletRpcInstance(Integer port, boolean withConnection) {
// check if monero-wallet-rpc exists
if (!new File(MONERO_WALLET_RPC_PATH).exists()) throw new Error("monero-wallet-rpc executable doesn't exist at path " + MONERO_WALLET_RPC_PATH
@ -497,7 +491,7 @@ public class XmrWalletService {
List<String> cmd = new ArrayList<>(Arrays.asList( // modifiable list
MONERO_WALLET_RPC_PATH, "--" + MONERO_NETWORK_TYPE.toString().toLowerCase(), "--rpc-login",
MONERO_WALLET_RPC_USERNAME + ":" + getWalletPassword(), "--wallet-dir", walletDir.toString()));
MoneroRpcConnection connection = connectionsService.getConnection();
MoneroRpcConnection connection = withConnection ? connectionsService.getConnection() : null;
if (connection != null) {
cmd.add("--daemon-address");
cmd.add(connection.getUri());

View file

@ -132,7 +132,7 @@ public abstract class Trade implements Tradable, Model {
// We changes order in trade protocol of publishing deposit tx and sending it to the peer.
// Now we send it first to the peer and only if that succeeds we publish it to avoid likelihood of
// failed trades. We do not want to change the order of the enum though so we keep it here as it was originally.
TAKER_PUBLISHED_DEPOSIT_TX(Phase.DEPOSIT_PUBLISHED),
ARBITRATOR_PUBLISHED_DEPOSIT_TX(Phase.DEPOSIT_PUBLISHED),
// DEPOSIT_TX_PUBLISHED_MSG
// taker perspective

View file

@ -67,6 +67,7 @@ import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.TorNetworkNode;
import com.google.common.collect.ImmutableList;
import bisq.common.ClockWatcher;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.crypto.KeyRing;
import bisq.common.handlers.ErrorMessageHandler;
@ -884,6 +885,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
private void updateTradePeriodState() {
getObservableList().forEach(trade -> {
UserThread.execute(() -> { // prevent concurrent modification error
if (!trade.isPayoutPublished()) {
Date maxTradePeriodDate = trade.getMaxTradePeriodDate();
Date halfTradePeriodDate = trade.getHalfTradePeriodDate();
@ -899,6 +901,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
}
});
});
}
@ -1060,12 +1063,13 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
return closedTradableManager.getClosedTrades().stream().filter(e -> e.getId().equals(tradeId)).findFirst();
}
private void removeTrade(Trade trade) {
private synchronized void removeTrade(Trade trade) {
log.info("TradeManager.removeTrade()");
synchronized(tradableList) {
if (!tradableList.contains(trade)) return;
synchronized (trade) {
// remove trade
tradableList.remove(trade);
// unreserve trade key images
if (trade instanceof TakerTrade && trade.getSelf().getReserveTxKeyImages() != null) {
@ -1079,11 +1083,9 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
tradableList.remove(trade);
requestPersistence();
}
}
}
private void addTrade(Trade trade) {
synchronized(tradableList) {

View file

@ -33,9 +33,9 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
System.out.println("ArbitratorProtocol.handleInitTradeRequest()");
synchronized (trade) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
latchTrade();
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
@ -46,11 +46,10 @@ public class ArbitratorProtocol extends DisputeProtocol {
ArbitratorSendsInitTradeAndMultisigRequests.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -63,9 +62,9 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleInitMultisigRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
@ -73,11 +72,10 @@ public class ArbitratorProtocol extends DisputeProtocol {
ProcessInitMultisigRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -90,9 +88,9 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleSignContractRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
@ -101,11 +99,10 @@ public class ArbitratorProtocol extends DisputeProtocol {
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -117,22 +114,23 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleDepositRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ArbitratorProcessesDepositRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
if (trade.getState() == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TX) {
stopTimeout();
this.errorMessageHandler = null;
}
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))

View file

@ -19,7 +19,6 @@ package bisq.core.trade.protocol;
import bisq.core.trade.BuyerAsMakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.Trade.State;
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
import bisq.core.trade.messages.DepositResponse;
import bisq.core.trade.messages.DepositTxAndDelayedPayoutTxMessage;
@ -77,8 +76,8 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
synchronized (trade) {
this.errorMessageHandler = errorMessageHandler;
latchTrade();
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
@ -89,11 +88,10 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
MakerSendsInitTradeRequestIfUnreserved.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -106,9 +104,9 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
@ -117,13 +115,13 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
}))
.withTimeout(TRADE_TIMEOUT))
.executeTasks();
awaitTradeLatch();
}
@ -133,9 +131,9 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
@ -144,13 +142,13 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
}))
.withTimeout(TRADE_TIMEOUT))
.executeTasks();
awaitTradeLatch();
}
@ -158,12 +156,13 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse()");
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId());
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
if (trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
.with(message)
.from(sender))
@ -172,11 +171,10 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT)) // extend timeout
@ -184,7 +182,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.CONTRACT_SIGNATURE_REQUESTED) handleSignContractResponse(message, sender);
if (state == Trade.State.CONTRACT_SIGNATURE_REQUESTED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
}
@ -194,10 +192,10 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
latchTrade();
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
expect(state(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
@ -205,11 +203,10 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -220,12 +217,13 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest() " + trade.getId());
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request);
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
if (trade.getState() == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
@ -236,12 +234,10 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
unlatchTrade();
this.errorMessageHandler = null;
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -249,7 +245,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
if (state == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) new Thread(() -> handlePaymentAccountPayloadRequest(request, sender)).start(); // process notification without trade lock
});
}
}

View file

@ -21,7 +21,6 @@ package bisq.core.trade.protocol;
import bisq.core.offer.Offer;
import bisq.core.trade.BuyerAsTakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.Trade.State;
import bisq.core.trade.handlers.TradeResultHandler;
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
import bisq.core.trade.messages.DepositResponse;
@ -94,9 +93,9 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
this.errorMessageHandler = errorMessageHandler;
latchTrade();
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
@ -106,6 +105,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
TakerSendsInitTradeRequestToArbitrator.class)
.using(new TradeTaskRunner(trade,
() -> {
startTimeout(TRADE_TIMEOUT);
unlatchTrade();
},
errorMessage -> {
@ -121,9 +121,9 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
@ -132,11 +132,10 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -149,9 +148,9 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
@ -160,11 +159,10 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -178,9 +176,10 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse()");
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
if (trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
.with(message)
.from(sender))
@ -189,20 +188,18 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
.withTimeout(TRADE_TIMEOUT)) // extend timeout
.executeTasks();
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state != State.CONTRACT_SIGNATURE_REQUESTED) return;
handleSignContractResponse(message, sender);
if (state == Trade.State.CONTRACT_SIGNATURE_REQUESTED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
}
@ -212,10 +209,10 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
latchTrade();
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
expect(state(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST)
.with(response)
.from(sender))
.setup(tasks(
@ -223,11 +220,10 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -241,10 +237,11 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
if (trade.getState() == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) // TODO (woodser): rename to RECEIVED_DEPOSIT_TX_PUBLISHED_MSG
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
@ -253,13 +250,11 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
unlatchTrade();
this.errorMessageHandler = null;
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -267,7 +262,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
if (state == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) new Thread(() -> handlePaymentAccountPayloadRequest(request, sender)).start(); // process notification without trade lock
});
}
}

View file

@ -30,9 +30,8 @@ import bisq.core.trade.protocol.tasks.buyer.BuyerPreparesPaymentSentMessage;
import bisq.core.trade.protocol.tasks.buyer.BuyerProcessesPaymentReceivedMessage;
import bisq.core.trade.protocol.tasks.buyer.BuyerSendsPaymentSentMessage;
import bisq.core.trade.protocol.tasks.buyer.BuyerSetupPayoutTxListener;
import bisq.core.util.Validator;
import bisq.network.p2p.NodeAddress;
import java.util.concurrent.CountDownLatch;
import bisq.common.handlers.ErrorMessageHandler;
import bisq.common.handlers.ResultHandler;
@ -127,9 +126,8 @@ public abstract class BuyerProtocol extends DisputeProtocol {
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerProtocol.onPaymentStarted()");
synchronized (trade) { // TODO (woodser): UpdateMultisigWithTradingPeer sends UpdateMultisigRequest and waits for UpdateMultisigResponse which is new thread, so synchronized (trade) in subsequent pipeline blocks forever if we hold on with countdown latch in this function
synchronized (trade) {
BuyerEvent event = BuyerEvent.PAYMENT_SENT;
CountDownLatch latch = new CountDownLatch(1);
expect(phase(Trade.Phase.DEPOSIT_UNLOCKED)
.with(event)
.preCondition(trade.confirmPermitted()))
@ -138,15 +136,13 @@ public abstract class BuyerProtocol extends DisputeProtocol {
//UpdateMultisigWithTradingPeer.class, // TODO (woodser): can use this to test protocol with updated multisig from peer. peer should attempt to send updated multisig hex earlier as part of protocol. cannot use with countdown latch because response comes back in a separate thread and blocks on trade
BuyerPreparesPaymentSentMessage.class,
//BuyerSetupPayoutTxListener.class,
BuyerSendsPaymentSentMessage.class)
BuyerSendsPaymentSentMessage.class) // don't latch trade because this blocks and runs in background
.using(new TradeTaskRunner(trade,
() -> {
latch.countDown();
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
latch.countDown();
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
@ -162,8 +158,9 @@ public abstract class BuyerProtocol extends DisputeProtocol {
protected void handle(PaymentReceivedMessage message, NodeAddress peer) {
log.info("BuyerProtocol.handle(PaymentReceivedMessage)");
synchronized (trade) {
processModel.setTradeMessage(message);
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.PAYMENT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.with(message)
.from(peer))
@ -172,13 +169,14 @@ public abstract class BuyerProtocol extends DisputeProtocol {
BuyerProcessesPaymentReceivedMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
stopTimeout();
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleError(errorMessage);
stopTimeout();
handleTaskRunnerFault(peer, message, errorMessage);
})))
}))
.withTimeout(TRADE_TIMEOUT))
.executeTasks();
awaitTradeLatch();
}

View file

@ -20,7 +20,6 @@ package bisq.core.trade.protocol;
import bisq.core.trade.SellerAsMakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.Trade.State;
import bisq.core.trade.messages.PaymentSentMessage;
import bisq.core.trade.messages.DepositResponse;
import bisq.core.trade.messages.DepositTxMessage;
@ -77,8 +76,8 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
synchronized (trade) {
this.errorMessageHandler = errorMessageHandler;
latchTrade();
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
@ -89,11 +88,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
MakerSendsInitTradeRequestIfUnreserved.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -106,9 +104,9 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
@ -117,11 +115,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -134,9 +131,9 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
@ -145,11 +142,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -163,9 +159,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse()");
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
if (trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
.with(message)
.from(sender))
@ -174,19 +171,18 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
.withTimeout(TRADE_TIMEOUT)) // extend timeout
.executeTasks();
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.CONTRACT_SIGNATURE_REQUESTED) handleSignContractResponse(message, sender);
if (state == Trade.State.CONTRACT_SIGNATURE_REQUESTED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
}
@ -196,10 +192,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
latchTrade();
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
expect(state(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
@ -207,11 +203,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -225,9 +220,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request);
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
if (trade.getState() == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
@ -238,12 +234,10 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
unlatchTrade();
this.errorMessageHandler = null;
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -251,7 +245,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
if (state == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) new Thread(() -> handlePaymentAccountPayloadRequest(request, sender)).start(); // process notification without trade lock
});
}
}

View file

@ -21,7 +21,6 @@ package bisq.core.trade.protocol;
import bisq.core.offer.Offer;
import bisq.core.trade.SellerAsTakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.Trade.State;
import bisq.core.trade.handlers.TradeResultHandler;
import bisq.core.trade.messages.PaymentSentMessage;
import bisq.core.trade.messages.DepositResponse;
@ -87,9 +86,9 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
synchronized (trade) {
latchTrade();
this.tradeResultHandler = tradeResultHandler;
this.errorMessageHandler = errorMessageHandler;
latchTrade();
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
@ -99,6 +98,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
TakerSendsInitTradeRequestToArbitrator.class)
.using(new TradeTaskRunner(trade,
() -> {
startTimeout(TRADE_TIMEOUT);
unlatchTrade();
},
errorMessage -> {
@ -114,9 +114,9 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
@ -125,11 +125,10 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -142,9 +141,9 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
latchTrade();
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
@ -153,11 +152,10 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -168,12 +166,13 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse()");
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId());
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == State.CONTRACT_SIGNATURE_REQUESTED) {
if (trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
.with(message)
.from(sender))
@ -182,20 +181,18 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
.withTimeout(TRADE_TIMEOUT)) // extend timeout
.executeTasks();
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state != State.CONTRACT_SIGNATURE_REQUESTED) return;
handleSignContractResponse(message, sender);
if (state == Trade.State.CONTRACT_SIGNATURE_REQUESTED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
}
@ -205,10 +202,10 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()");
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
latchTrade();
expect(state(Trade.State.CONTRACT_SIGNATURE_REQUESTED)
expect(state(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST)
.with(response)
.from(sender))
.setup(tasks(
@ -216,11 +213,10 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
startTimeout(TRADE_TIMEOUT);
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -231,13 +227,14 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest()");
System.out.println(getClass().getCanonicalName() + ".handlePaymentAccountPayloadRequest() " + trade.getId());
synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request);
if (tradeLatch == null) latchTrade(); // may be initialized from previous message
if (trade.getState() == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
if (trade.getState() == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) // TODO (woodser): rename to RECEIVED_DEPOSIT_TX_PUBLISHED_MSG
expect(state(Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
@ -246,13 +243,11 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
unlatchTrade();
this.errorMessageHandler = null;
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
@ -260,7 +255,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
awaitTradeLatch();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) handlePaymentAccountPayloadRequest(request, sender);
if (state == Trade.State.MAKER_RECEIVED_DEPOSIT_TX_PUBLISHED_MSG) new Thread(() -> handlePaymentAccountPayloadRequest(request, sender)).start(); // process notification without trade lock
});
}
}

View file

@ -83,7 +83,11 @@ public abstract class SellerProtocol extends DisputeProtocol {
// TODO A better fix would be to add a listener for the wallet sync state and process
// the mailbox msg once wallet is ready and trade state set.
synchronized (trade) {
//CountDownLatch latch = new CountDownLatch(1); // TODO: apply latch countdown
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
log.warn("Ignoring PaymentSentMessage which was already processed");
return;
}
latchTrade();
expect(anyPhase(Trade.Phase.DEPOSIT_UNLOCKED, Trade.Phase.DEPOSIT_PUBLISHED)
.with(message)
.from(peer)
@ -98,8 +102,19 @@ public abstract class SellerProtocol extends DisputeProtocol {
.setup(tasks(
SellerProcessesPaymentSentMessage.class,
ApplyFilter.class,
getVerifyPeersFeePaymentClass()))
getVerifyPeersFeePaymentClass())
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
stopTimeout();
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_TIMEOUT))
.executeTasks();
awaitTradeLatch();
}
}
@ -111,7 +126,6 @@ public abstract class SellerProtocol extends DisputeProtocol {
log.info("SellerProtocol.onPaymentReceived()");
synchronized (trade) {
SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
// CountDownLatch latch = new CountDownLatch(1); // TODO (woodser): user countdown latch, but freezes legacy app
expect(anyPhase(Trade.Phase.PAYMENT_SENT)
.with(event)
.preCondition(trade.confirmPermitted()))

View file

@ -107,7 +107,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
public void onWithdrawCompleted() {
cleanup();
log.info("Withdraw completed");
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
@ -218,18 +218,15 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// TODO (woodser): update to use fluent for consistency
public void handleUpdateMultisigRequest(UpdateMultisigRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
synchronized (trade) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
latchTrade();
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
() -> {
unlatchTrade();
stopTimeout();
handleTaskRunnerSuccess(peer, message, "handleUpdateMultisigRequest");
},
errorMessage -> {
handleError(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
});
taskRunner.addTasks(
@ -239,7 +236,6 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
taskRunner.run();
awaitTradeLatch();
}
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -365,32 +361,6 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
);
}
// TODO: trade protocols block if these are synchronized
protected void handleError(String errorMessage) {
log.error(errorMessage);
unlatchTrade();
trade.setErrorMessage(errorMessage);
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage(errorMessage);
processModel.getTradeManager().requestPersistence();
cleanup();
}
protected void latchTrade() {
if (tradeLatch != null) throw new RuntimeException("Trade latch is not null. This should never happen.");
tradeLatch = new CountDownLatch(1);
}
protected void unlatchTrade() {
if (tradeLatch != null) tradeLatch.countDown();
tradeLatch = null;
}
protected void awaitTradeLatch() {
if (tradeLatch == null) return;
TradeUtils.awaitLatch(tradeLatch);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Timeout
///////////////////////////////////////////////////////////////////////////////////////////
@ -478,6 +448,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// again.
removeMailboxMessageAfterProcessing(message);
}
unlatchTrade();
}
void handleTaskRunnerFault(NodeAddress ackReceiver, @Nullable TradeMessage message, String source, String errorMessage) {
@ -486,7 +457,33 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
if (message != null) {
sendAckMessage(ackReceiver, message, false, errorMessage);
}
cleanup();
handleError(errorMessage);
}
protected void handleError(String errorMessage) {
stopTimeout();
log.error(errorMessage);
trade.setErrorMessage(errorMessage);
processModel.getTradeManager().requestPersistence();
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage(errorMessage);
unlatchTrade();
}
protected void latchTrade() {
if (tradeLatch != null) throw new RuntimeException("Trade latch is not null. That should never happen.");
tradeLatch = new CountDownLatch(1);
}
protected void unlatchTrade() {
if (tradeLatch != null) tradeLatch.countDown();
tradeLatch = null;
}
protected void awaitTradeLatch() {
if (tradeLatch == null) return;
TradeUtils.awaitLatch(tradeLatch);
}
private boolean isMyMessage(NetworkEnvelope message) {
@ -501,10 +498,4 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return false;
}
}
private void cleanup() {
stopTimeout();
// We do not remove the decryptedDirectMessageListener as in case of not critical failures we want allow to receive
// follow-up messages still
}
}

View file

@ -25,7 +25,6 @@ import bisq.common.taskrunner.TaskRunner;
import bisq.core.btc.wallet.XmrWalletService;
import bisq.core.offer.Offer;
import bisq.core.offer.OfferDirection;
import bisq.core.offer.OfferPayload;
import bisq.core.trade.Trade;
import bisq.core.trade.messages.DepositRequest;
import bisq.core.trade.messages.DepositResponse;
@ -112,6 +111,10 @@ public class ArbitratorProcessesDepositRequest extends TradeTask {
daemon.submitTxHex(processModel.getMaker().getDepositTxHex()); // TODO (woodser): check that result is good. will need to release funds if one is submitted
daemon.submitTxHex(processModel.getTaker().getDepositTxHex());
// update trade state
log.info("Arbitrator submitted deposit txs for trade " + trade.getId());
trade.setState(Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TX);
// create deposit response
DepositResponse response = new DepositResponse(
trade.getOffer().getId(),
@ -124,6 +127,8 @@ public class ArbitratorProcessesDepositRequest extends TradeTask {
// send deposit response to maker and taker
sendDepositResponse(trade.getMakerNodeAddress(), trade.getMakerPubKeyRing(), response);
sendDepositResponse(trade.getTakerNodeAddress(), trade.getTakerPubKeyRing(), response);
} else {
log.info("Arbitrator waiting for deposit request from maker and taker for trade " + trade.getId());
}
// TODO (woodser): request persistence?
@ -134,6 +139,7 @@ public class ArbitratorProcessesDepositRequest extends TradeTask {
}
private void sendDepositResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, DepositResponse response) {
log.info("Sending deposit response to trader={}; offerId={}", nodeAddress, trade.getId());
processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, response, new SendDirectMessageListener() {
@Override
public void onArrived() {

View file

@ -170,7 +170,7 @@ public class ArbitratorSendsInitTradeAndMultisigRequests extends TradeTask {
new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived at arbitrator: offerId={}; uid={}", initMultisigRequest.getClass().getSimpleName(), initMultisigRequest.getTradeId(), initMultisigRequest.getUid());
log.info("{} arrived at maker: offerId={}; uid={}", initMultisigRequest.getClass().getSimpleName(), initMultisigRequest.getTradeId(), initMultisigRequest.getUid());
}
@Override
public void onFault(String errorMessage) {
@ -190,7 +190,7 @@ public class ArbitratorSendsInitTradeAndMultisigRequests extends TradeTask {
new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived at peer: offerId={}; uid={}", initMultisigRequest.getClass().getSimpleName(), initMultisigRequest.getTradeId(), initMultisigRequest.getUid());
log.info("{} arrived at taker: offerId={}; uid={}", initMultisigRequest.getClass().getSimpleName(), initMultisigRequest.getTradeId(), initMultisigRequest.getUid());
}
@Override
public void onFault(String errorMessage) {

View file

@ -82,9 +82,9 @@ public class ProcessInitMultisigRequest extends TradeTask {
// reconcile peer's established multisig hex with message
if (multisigParticipant.getPreparedMultisigHex() == null) multisigParticipant.setPreparedMultisigHex(request.getPreparedMultisigHex());
else if (!multisigParticipant.getPreparedMultisigHex().equals(request.getPreparedMultisigHex())) throw new RuntimeException("Message's prepared multisig differs from previous messages, previous: " + multisigParticipant.getPreparedMultisigHex() + ", message: " + request.getPreparedMultisigHex());
else if (request.getPreparedMultisigHex() != null && !multisigParticipant.getPreparedMultisigHex().equals(request.getPreparedMultisigHex())) throw new RuntimeException("Message's prepared multisig differs from previous messages, previous: " + multisigParticipant.getPreparedMultisigHex() + ", message: " + request.getPreparedMultisigHex());
if (multisigParticipant.getMadeMultisigHex() == null) multisigParticipant.setMadeMultisigHex(request.getMadeMultisigHex());
else if (!multisigParticipant.getMadeMultisigHex().equals(request.getMadeMultisigHex())) throw new RuntimeException("Message's made multisig differs from previous messages: " + request.getMadeMultisigHex() + " versus " + multisigParticipant.getMadeMultisigHex());
else if (request.getMadeMultisigHex() != null && !multisigParticipant.getMadeMultisigHex().equals(request.getMadeMultisigHex())) throw new RuntimeException("Message's made multisig differs from previous messages: " + request.getMadeMultisigHex() + " versus " + multisigParticipant.getMadeMultisigHex());
// prepare multisig if applicable
boolean updateParticipants = false;

View file

@ -18,19 +18,14 @@
package bisq.core.trade.protocol.tasks;
import static com.google.common.base.Preconditions.checkNotNull;
import bisq.common.UserThread;
import bisq.common.taskrunner.TaskRunner;
import bisq.core.btc.model.XmrAddressEntry;
import bisq.core.payment.payload.PaymentAccountPayload;
import bisq.core.trade.MakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.Trade.State;
import bisq.core.trade.messages.PaymentAccountPayloadRequest;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import monero.wallet.MoneroWallet;
import monero.wallet.model.MoneroTxWallet;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.Subscription;
@ -68,8 +63,4 @@ public class ProcessPaymentAccountPayloadRequest extends TradeTask {
failed(t);
}
}
private void unSubscribe() {
if (tradeStateSubscription != null) tradeStateSubscription.unsubscribe();
}
}

View file

@ -22,7 +22,6 @@ import bisq.common.app.Version;
import bisq.common.crypto.PubKeyRing;
import bisq.common.crypto.Sig;
import bisq.common.taskrunner.TaskRunner;
import bisq.common.util.Utilities;
import bisq.core.trade.ArbitratorTrade;
import bisq.core.trade.Contract;
import bisq.core.trade.Trade;
@ -102,7 +101,7 @@ public class ProcessSignContractRequest extends TradeTask {
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", response.getClass().getSimpleName(), recipient1, trade.getId());
ack1 = true;
if (ack1 && (recipient2 == null || ack2)) complete();
if (ack1 && (recipient2 == null || ack2)) completeAux();
}
@Override
public void onFault(String errorMessage) {
@ -119,7 +118,7 @@ public class ProcessSignContractRequest extends TradeTask {
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", response.getClass().getSimpleName(), recipient2, trade.getId());
ack2 = true;
if (ack1 && ack2) complete();
if (ack1 && ack2) completeAux();
}
@Override
public void onFault(String errorMessage) {
@ -129,11 +128,14 @@ public class ProcessSignContractRequest extends TradeTask {
}
});
}
// update trade state
trade.setState(State.CONTRACT_SIGNATURE_REQUESTED);
} catch (Throwable t) {
failed(t);
}
}
private void completeAux() {
trade.setState(State.CONTRACT_SIGNATURE_REQUESTED); // TODO: rename to contract_signature_request_received
processModel.getTradeManager().requestPersistence();
complete();
}
}

View file

@ -87,7 +87,8 @@ public class ProcessSignContractResponse extends TradeTask {
processModel.getP2PService().sendEncryptedDirectMessage(trade.getArbitratorNodeAddress(), trade.getArbitratorPubKeyRing(), request, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitratorNodeAddress(), trade.getId());
log.info("{} arrived: arbitrator={}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitratorNodeAddress(), trade.getId(), request.getUid());
trade.setState(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST); // TODO: rename to DEPOSIT_REQUESTED
processModel.getTradeManager().requestPersistence();
complete();
}
@ -99,6 +100,7 @@ public class ProcessSignContractResponse extends TradeTask {
}
});
} else {
log.info("Waiting for more contract signatures to send deposit request");
complete(); // does not yet have needed signatures
}
} catch (Throwable t) {

View file

@ -124,6 +124,7 @@ public class SendSignContractRequestAfterMultisig extends TradeTask {
}
private void completeAux() {
processModel.getTradeManager().requestPersistence();
processModel.getXmrWalletService().saveWallet(processModel.getXmrWalletService().getWallet());
complete();
}

View file

@ -35,7 +35,6 @@ import javafx.beans.value.ChangeListener;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import monero.wallet.MoneroWallet;
/**
* We send the seller the BuyerSendPaymentSentMessage.

View file

@ -237,8 +237,8 @@ class GrpcTradesService extends TradesImplBase {
put(getGetTradeMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getGetTradesMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getConfirmPaymentStartedMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getConfirmPaymentStartedMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getKeepFundsMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getWithdrawFundsMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getGetChatMessagesMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));

View file

@ -428,7 +428,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
// #################### Phase DEPOSIT_PAID
case TAKER_PUBLISHED_DEPOSIT_TX:
case ARBITRATOR_PUBLISHED_DEPOSIT_TX:
case TAKER_SAW_DEPOSIT_TX_IN_NETWORK:
// DEPOSIT_TX_PUBLISHED_MSG

View file

@ -1632,7 +1632,7 @@ message Trade {
MAKER_STORED_IN_MAILBOX_PUBLISH_DEPOSIT_TX_REQUEST = 7;
MAKER_SEND_FAILED_PUBLISH_DEPOSIT_TX_REQUEST = 8;
TAKER_RECEIVED_PUBLISH_DEPOSIT_TX_REQUEST = 9;
TAKER_PUBLISHED_DEPOSIT_TX = 10;
ARBITRATOR_PUBLISHED_DEPOSIT_TX = 10;
TAKER_SAW_DEPOSIT_TX_IN_NETWORK = 11;
TAKER_SENT_DEPOSIT_TX_PUBLISHED_MSG = 12;
TAKER_SAW_ARRIVED_DEPOSIT_TX_PUBLISHED_MSG = 13;