diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index 832ea91a..e463e831 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -47,6 +47,7 @@ public class ThreadUtils { synchronized (THREADS) { THREADS.put(threadId, Thread.currentThread()); } + Thread.currentThread().setName(threadId); command.run(); }); } diff --git a/core/src/main/java/haveno/core/api/XmrConnectionService.java b/core/src/main/java/haveno/core/api/XmrConnectionService.java index 0c78fc98..f4122e4a 100644 --- a/core/src/main/java/haveno/core/api/XmrConnectionService.java +++ b/core/src/main/java/haveno/core/api/XmrConnectionService.java @@ -36,7 +36,10 @@ import haveno.network.Socks5ProxyProvider; import haveno.network.p2p.P2PService; import haveno.network.p2p.P2PServiceListener; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import javafx.beans.property.IntegerProperty; import javafx.beans.property.LongProperty; @@ -103,6 +106,12 @@ public final class XmrConnectionService { private boolean isShutDownStarted; private List listeners = new ArrayList<>(); + // connection switching + private static final int EXCLUDE_CONNECTION_SECONDS = 300; + private static final int SKIP_SWITCH_WITHIN_MS = 60000; + private Set excludedConnections = new HashSet<>(); + private long lastSwitchRequestTimestamp; + @Inject public XmrConnectionService(P2PService p2PService, Config config, @@ -201,12 +210,6 @@ public final class XmrConnectionService { return connectionManager.getConnections(); } - public void switchToBestConnection() { - if (isFixedConnection() || !connectionManager.getAutoSwitch()) return; - MoneroRpcConnection bestConnection = getBestAvailableConnection(); - if (bestConnection != null) setConnection(bestConnection); - } - public void setConnection(String connectionUri) { accountService.checkAccountOpen(); connectionManager.setConnection(connectionUri); // listener will update connection list @@ -244,10 +247,67 @@ public final class XmrConnectionService { public MoneroRpcConnection getBestAvailableConnection() { accountService.checkAccountOpen(); List ignoredConnections = new ArrayList(); - if (xmrLocalNode.shouldBeIgnored() && connectionManager.hasConnection(xmrLocalNode.getUri())) ignoredConnections.add(connectionManager.getConnectionByUri(xmrLocalNode.getUri())); + addLocalNodeIfIgnored(ignoredConnections); return connectionManager.getBestAvailableConnection(ignoredConnections.toArray(new MoneroRpcConnection[0])); } + private MoneroRpcConnection getBestAvailableConnection(Collection ignoredConnections) { + accountService.checkAccountOpen(); + Set ignoredConnectionsSet = new HashSet<>(ignoredConnections); + addLocalNodeIfIgnored(ignoredConnectionsSet); + return connectionManager.getBestAvailableConnection(ignoredConnectionsSet.toArray(new MoneroRpcConnection[0])); + } + + private void addLocalNodeIfIgnored(Collection ignoredConnections) { + if (xmrLocalNode.shouldBeIgnored() && connectionManager.hasConnection(xmrLocalNode.getUri())) ignoredConnections.add(connectionManager.getConnectionByUri(xmrLocalNode.getUri())); + } + + private void switchToBestConnection() { + if (isFixedConnection() || !connectionManager.getAutoSwitch()) { + log.info("Skipping switch to best Monero connection because connection is fixed or auto switch is disabled"); + return; + } + MoneroRpcConnection bestConnection = getBestAvailableConnection(); + if (bestConnection != null) setConnection(bestConnection); + } + + public boolean requestSwitchToNextBestConnection() { + log.warn("Request made to switch to next best monerod, current monerod={}", getConnection() == null ? null : getConnection().getUri()); + + // skip if connection is fixed + if (isFixedConnection() || !connectionManager.getAutoSwitch()) { + log.info("Skipping switch to next best Monero connection because connection is fixed or auto switch is disabled"); + return false; + } + + // skip if last switch was too recent + boolean skipSwitch = System.currentTimeMillis() - lastSwitchRequestTimestamp < SKIP_SWITCH_WITHIN_MS; + lastSwitchRequestTimestamp = System.currentTimeMillis(); + if (skipSwitch) { + log.warn("Skipping switch to next best Monero connection because last switch was less than {} seconds ago", SKIP_SWITCH_WITHIN_MS / 1000); + lastSwitchRequestTimestamp = System.currentTimeMillis(); + return false; + } + + // try to get connection to switch to + MoneroRpcConnection currentConnection = getConnection(); + if (currentConnection != null) excludedConnections.add(currentConnection); + MoneroRpcConnection bestConnection = getBestAvailableConnection(excludedConnections); + + // remove from excluded connections after period + UserThread.runAfter(() -> { + if (currentConnection != null) excludedConnections.remove(currentConnection); + }, EXCLUDE_CONNECTION_SECONDS); + + // switch to best connection + if (bestConnection == null) { + log.warn("Could not get connection to switch to"); + return false; + } + setConnection(bestConnection); + return true; + } + public void setAutoSwitch(boolean autoSwitch) { accountService.checkAccountOpen(); connectionManager.setAutoSwitch(autoSwitch); @@ -505,7 +565,6 @@ public final class XmrConnectionService { // register connection listener connectionManager.addListener(this::onConnectionChanged); - isInitialized = true; } diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index e18d06ae..2be9a984 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -1057,6 +1057,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } catch (Exception e) { log.warn("Error creating split output tx to fund offer {} at subaddress {}, attempt={}/{}, error={}", openOffer.getShortId(), entry.getSubaddressIndex(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage()); if (stopped || i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrConnectionService.isConnected()) xmrWalletService.requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } } diff --git a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerReserveOfferFunds.java b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerReserveOfferFunds.java index 61cafd65..36759cc1 100644 --- a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerReserveOfferFunds.java +++ b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerReserveOfferFunds.java @@ -89,6 +89,7 @@ public class MakerReserveOfferFunds extends Task { log.warn("Error creating reserve tx, attempt={}/{}, offerId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, openOffer.getShortId(), e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; model.getProtocol().startTimeoutTimer(); // reset protocol timeout + if (model.getXmrWalletService().getConnectionService().isConnected()) model.getXmrWalletService().requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index 42b701d3..4e3c38ae 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -478,6 +478,7 @@ public final class ArbitrationManager extends DisputeManager { - ThreadUtils.submitToPool(() -> { // TODO: remove this? - ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); - }); + ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); }); // reset buyer's payment sent state if no ack receive @@ -847,6 +846,14 @@ public abstract class Trade implements Tradable, Model { } } + public boolean requestSwitchToNextBestConnection() { + if (xmrConnectionService.requestSwitchToNextBestConnection()) { + onConnectionChanged(xmrConnectionService.getConnection()); // change connection on same thread + return true; + } + return false; + } + public boolean isIdling() { return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && pollNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden } @@ -884,69 +891,8 @@ public abstract class Trade implements Tradable, Model { }).start(); } - public void importMultisigHex() { - synchronized (walletLock) { - synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh - for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { - try { - doImportMultisigHex(); - break; - } catch (IllegalArgumentException e) { - throw e; - } catch (Exception e) { - log.warn("Failed to import multisig hex, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); - if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; - HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying - } - } - } - } - } - - private void doImportMultisigHex() { - - // ensure wallet sees deposits confirmed - if (!isDepositsConfirmed()) syncAndPollWallet(); - - // collect multisig hex from peers - List multisigHexes = new ArrayList(); - for (TradePeer peer : getOtherPeers()) if (peer.getUpdatedMultisigHex() != null) multisigHexes.add(peer.getUpdatedMultisigHex()); - - // import multisig hex - log.info("Importing multisig hexes for {} {}, count={}", getClass().getSimpleName(), getShortId(), multisigHexes.size()); - long startTime = System.currentTimeMillis(); - if (!multisigHexes.isEmpty()) { - try { - wallet.importMultisigHex(multisigHexes.toArray(new String[0])); - } catch (MoneroError e) { - - // import multisig hex individually if one is invalid - if (isInvalidImportError(e.getMessage())) { - log.warn("Peer has invalid multisig hex for {} {}, importing individually", getClass().getSimpleName(), getShortId()); - boolean imported = false; - Exception lastError = null; - for (TradePeer peer : getOtherPeers()) { - if (peer.getUpdatedMultisigHex() == null) continue; - try { - wallet.importMultisigHex(peer.getUpdatedMultisigHex()); - imported = true; - } catch (MoneroError e2) { - lastError = e2; - if (isInvalidImportError(e2.getMessage())) { - log.warn("{} has invalid multisig hex for {} {}, error={}, multisigHex={}", getPeerRole(peer), getClass().getSimpleName(), getShortId(), e2.getMessage(), peer.getUpdatedMultisigHex()); - } else { - throw e2; - } - } - } - if (!imported) throw new IllegalArgumentException("Could not import any multisig hexes for " + getClass().getSimpleName() + " " + getShortId(), lastError); - } else { - throw e; - } - } - requestSaveWallet(); - } - log.info("Done importing multisig hexes for {} {} in {} ms, count={}", getClass().getSimpleName(), getShortId(), System.currentTimeMillis() - startTime, multisigHexes.size()); + private boolean isReadTimeoutError(String errMsg) { + return errMsg.contains("Read timed out"); } // TODO: checking error strings isn't robust, but the library doesn't provide a way to check if multisig hex is invalid. throw IllegalArgumentException from library on invalid multisig hex? @@ -962,7 +908,13 @@ public abstract class Trade implements Tradable, Model { } public void requestSaveWallet() { - ThreadUtils.submitToPool(() -> saveWallet()); // save wallet off main thread + + // save wallet off main thread + ThreadUtils.execute(() -> { + synchronized (walletLock) { + if (walletExists()) saveWallet(); + } + }, getId()); } public void saveWallet() { @@ -1109,6 +1061,104 @@ public abstract class Trade implements Tradable, Model { } } + public void importMultisigHex() { + synchronized (walletLock) { + synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh + for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { + try { + doImportMultisigHex(); + break; + } catch (IllegalArgumentException | IllegalStateException e) { + throw e; + } catch (Exception e) { + log.warn("Failed to import multisig hex, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); + if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection(); + if (isReadTimeoutError(e.getMessage())) forceRestartTradeWallet(); // wallet can be stuck a while + HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying + } + } + } + } + } + + private void doImportMultisigHex() { + + // ensure wallet sees deposits confirmed + if (!isDepositsConfirmed()) syncAndPollWallet(); + + // collect multisig hex from peers + List multisigHexes = new ArrayList(); + for (TradePeer peer : getOtherPeers()) if (peer.getUpdatedMultisigHex() != null) multisigHexes.add(peer.getUpdatedMultisigHex()); + + // import multisig hex + log.info("Importing multisig hexes for {} {}, count={}", getClass().getSimpleName(), getShortId(), multisigHexes.size()); + long startTime = System.currentTimeMillis(); + if (!multisigHexes.isEmpty()) { + try { + wallet.importMultisigHex(multisigHexes.toArray(new String[0])); + + // check if import is still needed // TODO: we once received a multisig hex which was too short, causing import to still be needed + if (wallet.isMultisigImportNeeded()) { + String errorMessage = "Multisig import still needed for " + getClass().getSimpleName() + " " + getShortId() + " after already importing, multisigHexes=" + multisigHexes; + log.warn(errorMessage); + + // ignore multisig hex which is significantly shorter than others + int maxLength = 0; + boolean removed = false; + for (String hex : multisigHexes) maxLength = Math.max(maxLength, hex.length()); + for (String hex : new ArrayList<>(multisigHexes)) { + if (hex.length() < maxLength / 2) { + String ignoringMessage = "Ignoring multisig hex from " + getMultisigHexRole(hex) + " for " + getClass().getSimpleName() + " " + getShortId() + " because it is too short, multisigHex=" + hex; + setErrorMessage(ignoringMessage); + log.warn(ignoringMessage); + multisigHexes.remove(hex); + removed = true; + } + } + + // re-import valid multisig hexes + if (removed) wallet.importMultisigHex(multisigHexes.toArray(new String[0])); + if (wallet.isMultisigImportNeeded()) throw new IllegalStateException(errorMessage); + } + } catch (MoneroError e) { + + // import multisig hex individually if one is invalid + if (isInvalidImportError(e.getMessage())) { + log.warn("Peer has invalid multisig hex for {} {}, importing individually", getClass().getSimpleName(), getShortId()); + boolean imported = false; + Exception lastError = null; + for (TradePeer peer : getOtherPeers()) { + if (peer.getUpdatedMultisigHex() == null) continue; + try { + wallet.importMultisigHex(peer.getUpdatedMultisigHex()); + imported = true; + } catch (MoneroError e2) { + lastError = e2; + if (isInvalidImportError(e2.getMessage())) { + log.warn("{} has invalid multisig hex for {} {}, error={}, multisigHex={}", getPeerRole(peer), getClass().getSimpleName(), getShortId(), e2.getMessage(), peer.getUpdatedMultisigHex()); + } else { + throw e2; + } + } + } + if (!imported) throw new IllegalArgumentException("Could not import any multisig hexes for " + getClass().getSimpleName() + " " + getShortId(), lastError); + } else { + throw e; + } + } + requestSaveWallet(); + } + log.info("Done importing multisig hexes for {} {} in {} ms, count={}", getClass().getSimpleName(), getShortId(), System.currentTimeMillis() - startTime, multisigHexes.size()); + } + + private String getMultisigHexRole(String multisigHex) { + if (multisigHex.equals(getArbitrator().getUpdatedMultisigHex())) return "arbitrator"; + if (multisigHex.equals(getBuyer().getUpdatedMultisigHex())) return "buyer"; + if (multisigHex.equals(getSeller().getUpdatedMultisigHex())) return "seller"; + throw new IllegalArgumentException("Multisig hex does not belong to any peer"); + } + /** * Create the payout tx. * @@ -1125,9 +1175,12 @@ public abstract class Trade implements Tradable, Model { for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { try { return doCreatePayoutTx(); + } catch (IllegalArgumentException | IllegalStateException e) { + throw e; } catch (Exception e) { log.warn("Failed to create payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } } @@ -1139,14 +1192,10 @@ public abstract class Trade implements Tradable, Model { private MoneroTxWallet doCreatePayoutTx() { // check if multisig import needed - if (wallet.isMultisigImportNeeded()) throw new RuntimeException("Cannot create payout tx because multisig import is needed"); + if (wallet.isMultisigImportNeeded()) throw new IllegalStateException("Cannot create payout tx because multisig import is needed for " + getClass().getSimpleName() + " " + getShortId()); - // TODO: wallet sometimes returns empty data, after disconnect? - List txs = wallet.getTxs(); // TODO: this fetches from pool - if (txs.isEmpty()) { - log.warn("Restarting wallet for {} {} because deposit txs are missing to create payout tx", getClass().getSimpleName(), getId()); - forceRestartTradeWallet(); - } + // recover if missing wallet data + recoverIfMissingWalletData(); // gather info String sellerPayoutAddress = getSeller().getPayoutAddressString(); @@ -1184,11 +1233,15 @@ public abstract class Trade implements Tradable, Model { synchronized (HavenoUtils.getWalletFunctionLock()) { for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { try { + if (wallet.isMultisigImportNeeded()) throw new IllegalStateException("Cannot create dispute payout tx because multisig import is needed for " + getClass().getSimpleName() + " " + getShortId()); return createTx(txConfig); + } catch (IllegalArgumentException | IllegalStateException e) { + throw e; } catch (Exception e) { - if (e.getMessage().contains("not possible")) throw new RuntimeException("Loser payout is too small to cover the mining fee"); + if (e.getMessage().contains("not possible")) throw new IllegalArgumentException("Loser payout is too small to cover the mining fee"); log.warn("Failed to create dispute payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } } @@ -1205,23 +1258,30 @@ public abstract class Trade implements Tradable, Model { * @param publish publishes the signed payout tx if true */ public void processPayoutTx(String payoutTxHex, boolean sign, boolean publish) { - log.info("Processing payout tx for {} {}", getClass().getSimpleName(), getId()); - - // TODO: wallet sometimes returns empty data, after disconnect? detect this condition with failure tolerance - for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { - try { - List txs = wallet.getTxs(); // TODO: this fetches from pool - if (txs.isEmpty()) { - log.warn("Restarting wallet for {} {} because deposit txs are missing to process payout tx", getClass().getSimpleName(), getId()); - forceRestartTradeWallet(); + synchronized (walletLock) { + synchronized (HavenoUtils.getWalletFunctionLock()) { + for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { + try { + doProcessPayoutTx(payoutTxHex, sign, publish); + break; + } catch (IllegalArgumentException | IllegalStateException e) { + throw e; + } catch (Exception e) { + log.warn("Failed to process payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); + if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection(); + HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying + } } - break; - } catch (Exception e) { - log.warn("Failed get wallet txs, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); - if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; - HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } } + } + + private void doProcessPayoutTx(String payoutTxHex, boolean sign, boolean publish) { + log.info("Processing payout tx for {} {}", getClass().getSimpleName(), getId()); + + // recover if missing wallet data + recoverIfMissingWalletData(); // gather relevant info MoneroWallet wallet = getWallet(); @@ -1234,6 +1294,7 @@ public abstract class Trade implements Tradable, Model { MoneroTxSet describedTxSet = wallet.describeTxSet(new MoneroTxSet().setMultisigTxHex(payoutTxHex)); if (describedTxSet.getTxs() == null || describedTxSet.getTxs().size() != 1) throw new IllegalArgumentException("Bad payout tx"); // TODO (woodser): test nack MoneroTxWallet payoutTx = describedTxSet.getTxs().get(0); + if (payoutTxId == null) updatePayout(payoutTx); // update payout tx if not signed // verify payout tx has exactly 2 destinations if (payoutTx.getOutgoingTransfer() == null || payoutTx.getOutgoingTransfer().getDestinations() == null || payoutTx.getOutgoingTransfer().getDestinations().size() != 2) throw new IllegalArgumentException("Payout tx does not have exactly two destinations"); @@ -1265,10 +1326,11 @@ public abstract class Trade implements Tradable, Model { if (!sellerPayoutDestination.getAmount().equals(expectedSellerPayout)) throw new IllegalArgumentException("Seller destination amount is not deposit amount - trade amount - 1/2 tx costs, " + sellerPayoutDestination.getAmount() + " vs " + expectedSellerPayout); // check connection - if (sign || publish) verifyDaemonConnection(); + boolean doSign = sign && getPayoutTxHex() == null; + if (doSign || publish) verifyDaemonConnection(); // handle tx signing - if (sign) { + if (doSign) { // sign tx try { @@ -1283,6 +1345,7 @@ public abstract class Trade implements Tradable, Model { // describe result describedTxSet = wallet.describeMultisigTxSet(payoutTxHex); payoutTx = describedTxSet.getTxs().get(0); + updatePayout(payoutTx); // verify fee is within tolerance by recreating payout tx // TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated? @@ -1294,22 +1357,16 @@ public abstract class Trade implements Tradable, Model { log.info("Payout tx fee {} is within tolerance, diff %={}", payoutTx.getFee(), feeDiff); } - // update trade state - updatePayout(payoutTx); + // save trade state requestPersistence(); // submit payout tx if (publish) { - for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) { - try { - wallet.submitMultisigTxHex(payoutTxHex); - ThreadUtils.submitToPool(() -> pollWallet()); - break; - } catch (Exception e) { - log.warn("Failed to submit payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage()); - if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; - HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying - } + try { + wallet.submitMultisigTxHex(payoutTxHex); + setPayoutStatePublished(); + } catch (Exception e) { + throw new RuntimeException("Failed to submit payout tx for " + getClass().getSimpleName() + " " + getId(), e); } } } @@ -2244,10 +2301,6 @@ public abstract class Trade implements Tradable, Model { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private String getConnectionChangedThreadId() { - return getId() + ".onConnectionChanged"; - } - // lazy initialization private ObjectProperty getAmountProperty() { if (tradeAmountProperty == null) @@ -2263,6 +2316,10 @@ public abstract class Trade implements Tradable, Model { return tradeVolumeProperty; } + private String getConnectionChangedThreadId() { + return getId() + ".onConnectionChanged"; + } + private void onConnectionChanged(MoneroRpcConnection connection) { synchronized (walletLock) { @@ -2355,11 +2412,11 @@ public abstract class Trade implements Tradable, Model { } private void setPollPeriod(long pollPeriodMs) { - synchronized (walletLock) { + synchronized (pollLock) { if (this.isShutDownStarted) return; if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return; this.pollPeriodMs = pollPeriodMs; - if (isPollInProgress()) { + if (isPolling()) { stopPolling(); startPolling(); } @@ -2372,8 +2429,8 @@ public abstract class Trade implements Tradable, Model { } private void startPolling() { - synchronized (walletLock) { - if (isShutDownStarted || isPollInProgress()) return; + synchronized (pollLock) { + if (isShutDownStarted || isPolling()) return; updatePollPeriod(); log.info("Starting to poll wallet for {} {}", getClass().getSimpleName(), getId()); pollLooper = new TaskLooper(() -> pollWallet()); @@ -2382,153 +2439,156 @@ public abstract class Trade implements Tradable, Model { } private void stopPolling() { - synchronized (walletLock) { - if (isPollInProgress()) { + synchronized (pollLock) { + if (isPolling()) { pollLooper.stop(); pollLooper = null; } } } - private boolean isPollInProgress() { - synchronized (walletLock) { + private boolean isPolling() { + synchronized (pollLock) { return pollLooper != null; } } private void pollWallet() { - if (pollInProgress) return; + synchronized (pollLock) { + if (pollInProgress) return; + } doPollWallet(); } private void doPollWallet() { + if (isShutDownStarted) return; synchronized (pollLock) { pollInProgress = true; - try { + } + try { - // skip if payout unlocked - if (isPayoutUnlocked()) return; + // skip if payout unlocked + if (isPayoutUnlocked()) return; - // skip if deposit txs unknown or not requested - if (processModel.getMaker().getDepositTxHash() == null || processModel.getTaker().getDepositTxHash() == null || !isDepositRequested()) return; + // skip if deposit txs unknown or not requested + if (processModel.getMaker().getDepositTxHash() == null || processModel.getTaker().getDepositTxHash() == null || !isDepositRequested()) return; - // skip if daemon not synced - if (xmrConnectionService.getTargetHeight() == null || !xmrConnectionService.isSyncedWithinTolerance()) return; + // skip if daemon not synced + if (xmrConnectionService.getTargetHeight() == null || !xmrConnectionService.isSyncedWithinTolerance()) return; - // sync if wallet too far behind daemon - if (walletHeight.get() < xmrConnectionService.getTargetHeight() - SYNC_EVERY_NUM_BLOCKS) syncWallet(false); + // sync if wallet too far behind daemon + if (walletHeight.get() < xmrConnectionService.getTargetHeight() - SYNC_EVERY_NUM_BLOCKS) syncWallet(false); - // update deposit txs - if (!isDepositsUnlocked()) { + // update deposit txs + if (!isDepositsUnlocked()) { - // sync wallet if behind - syncWalletIfBehind(); + // sync wallet if behind + syncWalletIfBehind(); - // get txs from trade wallet - MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true); - Boolean updatePool = !isDepositsConfirmed() && (getMaker().getDepositTx() == null || getTaker().getDepositTx() == null); - if (!updatePool) query.setInTxPool(false); // avoid updating from pool if possible - List txs; - if (!updatePool) txs = wallet.getTxs(query); - else { - synchronized (walletLock) { - synchronized (HavenoUtils.getDaemonLock()) { - txs = wallet.getTxs(query); - } - } - } - setDepositTxs(txs); - if (getMaker().getDepositTx() == null || getTaker().getDepositTx() == null) return; // skip if either deposit tx not seen - setStateDepositsSeen(); - - // set actual security deposits - if (getBuyer().getSecurityDeposit().longValueExact() == 0) { - BigInteger buyerSecurityDeposit = ((MoneroTxWallet) getBuyer().getDepositTx()).getIncomingAmount(); - BigInteger sellerSecurityDeposit = ((MoneroTxWallet) getSeller().getDepositTx()).getIncomingAmount().subtract(getAmount()); - getBuyer().setSecurityDeposit(buyerSecurityDeposit); - getSeller().setSecurityDeposit(sellerSecurityDeposit); - } - - // check for deposit txs confirmation - if (getMaker().getDepositTx().isConfirmed() && getTaker().getDepositTx().isConfirmed()) setStateDepositsConfirmed(); - - // check for deposit txs unlocked - if (getMaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK && getTaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK) { - setStateDepositsUnlocked(); - } - } - - // check for payout tx - if (isDepositsUnlocked()) { - - // determine if payout tx expected - boolean isPayoutExpected = isPaymentReceived() || hasPaymentReceivedMessage() || hasDisputeClosedMessage() || disputeState.ordinal() >= DisputeState.ARBITRATOR_SENT_DISPUTE_CLOSED_MSG.ordinal(); - - // sync wallet if payout expected or payout is published - if (isPayoutExpected || isPayoutPublished()) syncWalletIfBehind(); - - // rescan spent outputs to detect unconfirmed payout tx - if (isPayoutExpected && wallet.getBalance().compareTo(BigInteger.ZERO) > 0) { - try { - wallet.rescanSpent(); - } catch (Exception e) { - log.warn("Error rescanning spent outputs to detect payout tx for {} {}, errorMessage={}", getClass().getSimpleName(), getShortId(), e.getMessage()); - } - } - - // get txs from trade wallet - MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true); - boolean updatePool = isPayoutExpected && !isPayoutConfirmed(); - if (!updatePool) query.setInTxPool(false); // avoid updating from pool if possible - List txs = null; - if (!updatePool) txs = wallet.getTxs(query); - else { - synchronized (walletLock) { - synchronized (HavenoUtils.getDaemonLock()) { - txs = wallet.getTxs(query); - } - } - } - setDepositTxs(txs); - - // check if any outputs spent (observed on payout published) - boolean hasSpentOutput = false; - boolean hasFailedTx = false; - for (MoneroTxWallet tx : txs) { - if (tx.isFailed()) hasFailedTx = true; - for (MoneroOutputWallet output : tx.getOutputsWallet()) { - if (Boolean.TRUE.equals(output.isSpent())) hasSpentOutput = true; - } - } - if (hasSpentOutput) setPayoutStatePublished(); - else if (hasFailedTx && isPayoutPublished()) { - log.warn("{} {} is in payout published state but has failed tx and no spent outputs, resetting payout state to unpublished", getClass().getSimpleName(), getShortId()); - setPayoutState(PayoutState.PAYOUT_UNPUBLISHED); - } - - // check for outgoing txs (appears after wallet submits payout tx or on payout confirmed) - for (MoneroTxWallet tx : txs) { - if (tx.isOutgoing() && !tx.isFailed()) { - updatePayout(tx); - setPayoutStatePublished(); - if (tx.isConfirmed()) setPayoutStateConfirmed(); - if (!tx.isLocked()) setPayoutStateUnlocked(); - } - } - } - } catch (Exception e) { - boolean isConnectionRefused = e.getMessage() != null && e.getMessage().contains("Connection refused"); - if (isConnectionRefused) forceRestartTradeWallet(); + // get txs from trade wallet + MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true); + Boolean updatePool = !isDepositsConfirmed() && (getMaker().getDepositTx() == null || getTaker().getDepositTx() == null); + if (!updatePool) query.setInTxPool(false); // avoid updating from pool if possible + List txs; + if (!updatePool) txs = wallet.getTxs(query); else { - boolean isWalletConnected = isWalletConnectedToDaemon(); - if (!isShutDownStarted && wallet != null && isWalletConnected) { - log.warn("Error polling trade wallet for {} {}, errorMessage={}. Monerod={}", getClass().getSimpleName(), getShortId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection()); - //e.printStackTrace(); + synchronized (walletLock) { + synchronized (HavenoUtils.getDaemonLock()) { + txs = wallet.getTxs(query); + } } } - } finally { + setDepositTxs(txs); + if (getMaker().getDepositTx() == null || getTaker().getDepositTx() == null) return; // skip if either deposit tx not seen + setStateDepositsSeen(); + + // set actual security deposits + if (getBuyer().getSecurityDeposit().longValueExact() == 0) { + BigInteger buyerSecurityDeposit = ((MoneroTxWallet) getBuyer().getDepositTx()).getIncomingAmount(); + BigInteger sellerSecurityDeposit = ((MoneroTxWallet) getSeller().getDepositTx()).getIncomingAmount().subtract(getAmount()); + getBuyer().setSecurityDeposit(buyerSecurityDeposit); + getSeller().setSecurityDeposit(sellerSecurityDeposit); + } + + // check for deposit txs confirmation + if (getMaker().getDepositTx().isConfirmed() && getTaker().getDepositTx().isConfirmed()) setStateDepositsConfirmed(); + + // check for deposit txs unlocked + if (getMaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK && getTaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK) { + setStateDepositsUnlocked(); + } + } + + // check for payout tx + if (isDepositsUnlocked()) { + + // determine if payout tx expected + boolean isPayoutExpected = isPaymentReceived() || hasPaymentReceivedMessage() || hasDisputeClosedMessage() || disputeState.ordinal() >= DisputeState.ARBITRATOR_SENT_DISPUTE_CLOSED_MSG.ordinal(); + + // sync wallet if payout expected or payout is published + if (isPayoutExpected || isPayoutPublished()) syncWalletIfBehind(); + + // rescan spent outputs to detect unconfirmed payout tx + if (isPayoutExpected && wallet.getBalance().compareTo(BigInteger.ZERO) > 0) { + wallet.rescanSpent(); + } + + // get txs from trade wallet + MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true); + boolean updatePool = isPayoutExpected && !isPayoutConfirmed(); + if (!updatePool) query.setInTxPool(false); // avoid updating from pool if possible + List txs = null; + if (!updatePool) txs = wallet.getTxs(query); + else { + synchronized (walletLock) { + synchronized (HavenoUtils.getDaemonLock()) { + txs = wallet.getTxs(query); + } + } + } + setDepositTxs(txs); + + // check if any outputs spent (observed on payout published) + boolean hasSpentOutput = false; + boolean hasFailedTx = false; + for (MoneroTxWallet tx : txs) { + if (tx.isFailed()) hasFailedTx = true; + for (MoneroOutputWallet output : tx.getOutputsWallet()) { + if (Boolean.TRUE.equals(output.isSpent())) hasSpentOutput = true; + } + } + if (hasSpentOutput) setPayoutStatePublished(); + else if (hasFailedTx && isPayoutPublished()) { + log.warn("{} {} is in payout published state but has failed tx and no spent outputs, resetting payout state to unpublished", getClass().getSimpleName(), getShortId()); + setPayoutState(PayoutState.PAYOUT_UNPUBLISHED); + } + + // check for outgoing txs (appears after wallet submits payout tx or on payout confirmed) + for (MoneroTxWallet tx : txs) { + if (tx.isOutgoing() && !tx.isFailed()) { + updatePayout(tx); + setPayoutStatePublished(); + if (tx.isConfirmed()) setPayoutStateConfirmed(); + if (!tx.isLocked()) setPayoutStateUnlocked(); + } + } + } + } catch (Exception e) { + boolean isConnectionRefused = e.getMessage() != null && e.getMessage().contains("Connection refused"); + if (isConnectionRefused) forceRestartTradeWallet(); + else { + boolean isWalletConnected = isWalletConnectedToDaemon(); + if (!isShutDownStarted && wallet != null && isWalletConnected) { + log.warn("Error polling trade wallet for {} {}, errorMessage={}. Monerod={}", getClass().getSimpleName(), getShortId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection()); + requestSwitchToNextBestConnection(); + //e.printStackTrace(); + } + } + } finally { + synchronized (pollLock) { pollInProgress = false; } + requestSaveWallet(); } } @@ -2553,6 +2613,70 @@ public abstract class Trade implements Tradable, Model { depositTxsUpdateCounter.set(depositTxsUpdateCounter.get() + 1); } + // TODO: wallet is sometimes missing balance or deposits, due to specific daemon connections, not saving? + private void recoverIfMissingWalletData() { + synchronized (walletLock) { + if (isWalletMissingData()) { + log.warn("Wallet is missing data for {} {}, attempting to recover", getClass().getSimpleName(), getShortId()); + + // force restart wallet + forceRestartTradeWallet(); + + // rescan blockchain with global daemon lock + synchronized (HavenoUtils.getDaemonLock()) { + Long timeout = null; + try { + + // extend rpc timeout for rescan + if (wallet instanceof MoneroWalletRpc) { + timeout = ((MoneroWalletRpc) wallet).getRpcConnection().getTimeout(); + ((MoneroWalletRpc) wallet).getRpcConnection().setTimeout(EXTENDED_RPC_TIMEOUT); + } + + // rescan blockchain + log.warn("Rescanning blockchain for {} {}", getClass().getSimpleName(), getShortId()); + wallet.rescanBlockchain(); + } catch (Exception e) { + if (isReadTimeoutError(e.getMessage())) forceRestartTradeWallet(); // wallet can be stuck a while + throw e; + } finally { + + // restore rpc timeout + if (wallet instanceof MoneroWalletRpc) { + ((MoneroWalletRpc) wallet).getRpcConnection().setTimeout(timeout); + } + } + } + + // import multisig hex + log.warn("Importing multisig hex to recover wallet data for {} {}", getClass().getSimpleName(), getShortId()); + importMultisigHex(); + } + } + + // check again after releasing lock + if (isWalletMissingData()) throw new IllegalStateException("Wallet is still missing data after attempting recovery for " + getClass().getSimpleName() + " " + getShortId()); + } + + private boolean isWalletMissingData() { + synchronized (walletLock) { + if (!isDepositsUnlocked() || isPayoutPublished()) return false; + if (getMakerDepositTx() == null) { + log.warn("Missing maker deposit tx for {} {}", getClass().getSimpleName(), getId()); + return true; + } + if (getTakerDepositTx() == null) { + log.warn("Missing taker deposit tx for {} {}", getClass().getSimpleName(), getId()); + return true; + } + if (wallet.getBalance().equals(BigInteger.ZERO)) { + log.warn("Wallet balance is zero for {} {}", getClass().getSimpleName(), getId()); + return true; + } + return false; + } + } + private void forceRestartTradeWallet() { if (isShutDownStarted || restartInProgress) return; log.warn("Force restarting trade wallet for {} {}", getClass().getSimpleName(), getId()); @@ -2560,7 +2684,7 @@ public abstract class Trade implements Tradable, Model { forceCloseWallet(); if (!isShutDownStarted) wallet = getWallet(); restartInProgress = false; - doPollWallet(); + pollWallet(); if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId()); } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/MaybeSendSignContractRequest.java b/core/src/main/java/haveno/core/trade/protocol/tasks/MaybeSendSignContractRequest.java index 353c167a..bcc75132 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/MaybeSendSignContractRequest.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/MaybeSendSignContractRequest.java @@ -105,6 +105,7 @@ public class MaybeSendSignContractRequest extends TradeTask { } catch (Exception e) { log.warn("Error creating deposit tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, trade.getShortId(), e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (trade.getXmrConnectionService().isConnected()) trade.getXmrWalletService().requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/TakerReserveTradeFunds.java b/core/src/main/java/haveno/core/trade/protocol/tasks/TakerReserveTradeFunds.java index 6e0719c1..0f0dd1cb 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/TakerReserveTradeFunds.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/TakerReserveTradeFunds.java @@ -71,6 +71,7 @@ public class TakerReserveTradeFunds extends TradeTask { } catch (Exception e) { log.warn("Error creating reserve tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, trade.getShortId(), e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (trade.getXmrConnectionService().isConnected()) trade.getXmrWalletService().requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } diff --git a/core/src/main/java/haveno/core/xmr/Balances.java b/core/src/main/java/haveno/core/xmr/Balances.java index 45525e76..cfae1cb5 100644 --- a/core/src/main/java/haveno/core/xmr/Balances.java +++ b/core/src/main/java/haveno/core/xmr/Balances.java @@ -35,6 +35,8 @@ package haveno.core.xmr; import com.google.inject.Inject; + +import haveno.common.ThreadUtils; import haveno.common.UserThread; import haveno.core.api.model.XmrBalanceInfo; import haveno.core.offer.OpenOffer; @@ -103,7 +105,7 @@ public class Balances { updateBalances(); } }); - updateBalances(); + doUpdateBalances(); } public XmrBalanceInfo getBalances() { @@ -117,42 +119,48 @@ public class Balances { } private void updateBalances() { + ThreadUtils.submitToPool(() -> doUpdateBalances()); + } + + private void doUpdateBalances() { synchronized (this) { - - // get wallet balances - BigInteger balance = xmrWalletService.getWallet() == null ? BigInteger.ZERO : xmrWalletService.getBalance(); - availableBalance = xmrWalletService.getWallet() == null ? BigInteger.ZERO : xmrWalletService.getAvailableBalance(); + synchronized (XmrWalletService.WALLET_LOCK) { - // calculate pending balance by adding frozen trade balances - reserved amounts - pendingBalance = balance.subtract(availableBalance); - List trades = tradeManager.getTradesStreamWithFundsLockedIn().collect(Collectors.toList()); - for (Trade trade : trades) { - if (trade.getFrozenAmount().equals(new BigInteger("0"))) continue; - BigInteger tradeFee = trade instanceof MakerTrade ? trade.getMakerFee() : trade.getTakerFee(); - pendingBalance = pendingBalance.add(trade.getFrozenAmount()).subtract(trade.getReservedAmount()).subtract(tradeFee).subtract(trade.getSelf().getDepositTxFee()); + // get wallet balances + BigInteger balance = xmrWalletService.getWallet() == null ? BigInteger.ZERO : xmrWalletService.getBalance(); + availableBalance = xmrWalletService.getWallet() == null ? BigInteger.ZERO : xmrWalletService.getAvailableBalance(); + + // calculate pending balance by adding frozen trade balances - reserved amounts + pendingBalance = balance.subtract(availableBalance); + List trades = tradeManager.getTradesStreamWithFundsLockedIn().collect(Collectors.toList()); + for (Trade trade : trades) { + if (trade.getFrozenAmount().equals(new BigInteger("0"))) continue; + BigInteger tradeFee = trade instanceof MakerTrade ? trade.getMakerFee() : trade.getTakerFee(); + pendingBalance = pendingBalance.add(trade.getFrozenAmount()).subtract(trade.getReservedAmount()).subtract(tradeFee).subtract(trade.getSelf().getDepositTxFee()); + } + + // calculate reserved offer balance + reservedOfferBalance = BigInteger.ZERO; + if (xmrWalletService.getWallet() != null) { + List frozenOutputs = xmrWalletService.getOutputs(new MoneroOutputQuery().setIsFrozen(true).setIsSpent(false)); + for (MoneroOutputWallet frozenOutput : frozenOutputs) reservedOfferBalance = reservedOfferBalance.add(frozenOutput.getAmount()); + } + for (Trade trade : trades) { + reservedOfferBalance = reservedOfferBalance.subtract(trade.getFrozenAmount()); // subtract frozen trade balances + } + + // calculate reserved trade balance + reservedTradeBalance = BigInteger.ZERO; + for (Trade trade : trades) { + reservedTradeBalance = reservedTradeBalance.add(trade.getReservedAmount()); + } + + // calculate reserved balance + reservedBalance = reservedOfferBalance.add(reservedTradeBalance); + + // notify balance update + UserThread.execute(() -> updateCounter.set(updateCounter.get() + 1)); } - - // calculate reserved offer balance - reservedOfferBalance = BigInteger.ZERO; - if (xmrWalletService.getWallet() != null) { - List frozenOutputs = xmrWalletService.getOutputs(new MoneroOutputQuery().setIsFrozen(true).setIsSpent(false)); - for (MoneroOutputWallet frozenOutput : frozenOutputs) reservedOfferBalance = reservedOfferBalance.add(frozenOutput.getAmount()); - } - for (Trade trade : trades) { - reservedOfferBalance = reservedOfferBalance.subtract(trade.getFrozenAmount()); // subtract frozen trade balances - } - - // calculate reserved trade balance - reservedTradeBalance = BigInteger.ZERO; - for (Trade trade : trades) { - reservedTradeBalance = reservedTradeBalance.add(trade.getReservedAmount()); - } - - // calculate reserved balance - reservedBalance = reservedOfferBalance.add(reservedTradeBalance); - - // notify balance update - UserThread.execute(() -> updateCounter.set(updateCounter.get() + 1)); } } } diff --git a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java index 17883a36..d135526a 100644 --- a/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java +++ b/core/src/main/java/haveno/core/xmr/wallet/XmrWalletService.java @@ -24,6 +24,7 @@ import com.google.inject.name.Named; import common.utils.JsonUtils; import haveno.common.ThreadUtils; +import haveno.common.Timer; import haveno.common.UserThread; import haveno.common.config.Config; import haveno.common.file.FileUtil; @@ -67,6 +68,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import javafx.beans.property.LongProperty; @@ -155,14 +157,16 @@ public class XmrWalletService { private TradeManager tradeManager; private MoneroWallet wallet; public static final Object WALLET_LOCK = new Object(); - private boolean wasWalletSynced = false; + private boolean wasWalletSynced; private final Map> txCache = new HashMap>(); - private boolean isClosingWallet = false; - private boolean isShutDownStarted = false; + private boolean isClosingWallet; + private boolean isShutDownStarted; private ExecutorService syncWalletThreadPool = Executors.newFixedThreadPool(10); // TODO: adjust based on connection type - private Long syncStartHeight = null; - private TaskLooper syncWithProgressLooper = null; - CountDownLatch syncWithProgressLatch; + private Long syncStartHeight; + private TaskLooper syncProgressLooper; + private CountDownLatch syncProgressLatch; + private Timer syncProgressTimeout; + private static final int SYNC_PROGRESS_TIMEOUT_SECONDS = 45; // wallet polling and cache private TaskLooper pollLooper; @@ -933,7 +937,7 @@ public class XmrWalletService { e.printStackTrace(); // force close wallet - forceCloseWallet(wallet, getWalletPath(MONERO_WALLET_NAME)); + forceCloseMainWallet(); } log.info("Done shutting down {}", getClass().getSimpleName()); @@ -1281,22 +1285,7 @@ public class XmrWalletService { else log.info(appliedMsg); // listen for connection changes - xmrConnectionService.addConnectionListener(connection -> { - - // force restart main wallet if connection changed before synced - if (!wasWalletSynced) { - if (!Boolean.TRUE.equals(xmrConnectionService.isConnected())) return; - ThreadUtils.submitToPool(() -> { - log.warn("Force restarting main wallet because connection changed before inital sync"); - forceRestartMainWallet(); - }); - return; - } else { - - // apply connection changes - ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID); - } - }); + xmrConnectionService.addConnectionListener(connection -> ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID)); // initialize main wallet when daemon synced walletInitListener = (obs, oldVal, newVal) -> initMainWalletIfConnected(); @@ -1305,111 +1294,110 @@ public class XmrWalletService { } private void initMainWalletIfConnected() { - ThreadUtils.execute(() -> { - synchronized (WALLET_LOCK) { - if (wallet == null && xmrConnectionService.downloadPercentageProperty().get() == 1 && !isShutDownStarted) { - maybeInitMainWallet(true); - if (walletInitListener != null) xmrConnectionService.downloadPercentageProperty().removeListener(walletInitListener); - } - } - }, THREAD_ID); + if (wallet == null && xmrConnectionService.downloadPercentageProperty().get() == 1 && !isShutDownStarted) { + maybeInitMainWallet(true); + } } private void maybeInitMainWallet(boolean sync) { - try { - maybeInitMainWallet(sync, MAX_SYNC_ATTEMPTS); - } catch (Exception e) { - log.warn("Error initializing main wallet: " + e.getMessage()); - e.printStackTrace(); - HavenoUtils.havenoSetup.getTopErrorMsg().set(e.getMessage()); - throw e; - } + maybeInitMainWallet(sync, MAX_SYNC_ATTEMPTS); } private void maybeInitMainWallet(boolean sync, int numAttempts) { - synchronized (WALLET_LOCK) { - if (isShutDownStarted) return; - - // open or create wallet main wallet - if (wallet == null) { - MoneroDaemonRpc daemon = xmrConnectionService.getDaemon(); - log.info("Initializing main wallet with monerod=" + (daemon == null ? "null" : daemon.getRpcConnection().getUri())); - if (MoneroUtils.walletExists(xmrWalletFile.getPath())) { - wallet = openWallet(MONERO_WALLET_NAME, rpcBindPort, isProxyApplied(wasWalletSynced)); - } else if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) { - wallet = createWallet(MONERO_WALLET_NAME, rpcBindPort); - - // set wallet creation date to yesterday to guarantee complete restore - LocalDateTime localDateTime = LocalDate.now().atStartOfDay().minusDays(1); - long date = localDateTime.toEpochSecond(ZoneOffset.UTC); - user.setWalletCreationDate(date); + ThreadUtils.execute(() -> { + synchronized (WALLET_LOCK) { + if (isShutDownStarted) return; + + // open or create wallet main wallet + if (wallet == null) { + MoneroDaemonRpc daemon = xmrConnectionService.getDaemon(); + log.info("Initializing main wallet with monerod=" + (daemon == null ? "null" : daemon.getRpcConnection().getUri())); + if (MoneroUtils.walletExists(xmrWalletFile.getPath())) { + wallet = openWallet(MONERO_WALLET_NAME, rpcBindPort, isProxyApplied(wasWalletSynced)); + } else if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) { + wallet = createWallet(MONERO_WALLET_NAME, rpcBindPort); + + // set wallet creation date to yesterday to guarantee complete restore + LocalDateTime localDateTime = LocalDate.now().atStartOfDay().minusDays(1); + long date = localDateTime.toEpochSecond(ZoneOffset.UTC); + user.setWalletCreationDate(date); + } + isClosingWallet = false; } - isClosingWallet = false; - } + + // sync wallet and register listener + if (wallet != null && !isShutDownStarted) { + log.info("Monero wallet path={}", wallet.getPath()); + + // sync main wallet if applicable + if (sync && numAttempts > 0) { + try { - // sync wallet and register listener - if (wallet != null && !isShutDownStarted) { - log.info("Monero wallet path={}", wallet.getPath()); + // switch connection if disconnected + if (!wallet.isConnectedToDaemon()) { + log.warn("Switching connection before syncing with progress because disconnected"); + if (requestSwitchToNextBestConnection()) return; // calls back to this method + } + + // sync main wallet + log.info("Syncing main wallet"); + long time = System.currentTimeMillis(); + syncWithProgress(); // blocking + log.info("Done syncing main wallet in " + (System.currentTimeMillis() - time) + " ms"); - // sync main wallet if applicable - if (sync && numAttempts > 0) { - try { - - // sync main wallet - log.info("Syncing main wallet"); - long time = System.currentTimeMillis(); - syncWithProgress(); // blocking - log.info("Done syncing main wallet in " + (System.currentTimeMillis() - time) + " ms"); - doPollWallet(true); - - // log wallet balances - if (getMoneroNetworkType() != MoneroNetworkType.MAINNET) { - BigInteger balance = getBalance(); - BigInteger unlockedBalance = getAvailableBalance(); - log.info("Monero wallet unlocked balance={}, pending balance={}, total balance={}", unlockedBalance, balance.subtract(unlockedBalance), balance); - } - - // reapply connection after wallet synced - onConnectionChanged(xmrConnectionService.getConnection()); - - // reset internal state if main wallet was swapped - resetIfWalletChanged(); - - // signal that main wallet is synced - doneDownload(); - - // notify setup that main wallet is initialized - // TODO: app fully initializes after this is set to true, even though wallet might not be initialized if unconnected. wallet will be created when connection detected - // refactor startup to call this and sync off main thread? but the calls to e.g. getBalance() fail with 'wallet and network is not yet initialized' - HavenoUtils.havenoSetup.getWalletInitialized().set(true); - - // save but skip backup on initialization - saveMainWallet(false); - } catch (Exception e) { - if (isClosingWallet || isShutDownStarted || HavenoUtils.havenoSetup.getWalletInitialized().get()) return; // ignore if wallet closing, shut down started, or app already initialized - log.warn("Error initially syncing main wallet: {}", e.getMessage()); - if (numAttempts <= 1) { - log.warn("Failed to sync main wallet. Opening app without syncing", numAttempts); + // poll wallet + doPollWallet(true); + if (walletInitListener != null) xmrConnectionService.downloadPercentageProperty().removeListener(walletInitListener); + + // log wallet balances + if (getMoneroNetworkType() != MoneroNetworkType.MAINNET) { + BigInteger balance = getBalance(); + BigInteger unlockedBalance = getAvailableBalance(); + log.info("Monero wallet unlocked balance={}, pending balance={}, total balance={}", unlockedBalance, balance.subtract(unlockedBalance), balance); + } + + // reapply connection after wallet synced (might reinitialize wallet on new thread) + ThreadUtils.execute(() -> onConnectionChanged(xmrConnectionService.getConnection()), THREAD_ID); + + // reset internal state if main wallet was swapped + resetIfWalletChanged(); + + // signal that main wallet is synced + doneDownload(); + + // notify setup that main wallet is initialized + // TODO: app fully initializes after this is set to true, even though wallet might not be initialized if unconnected. wallet will be created when connection detected + // refactor startup to call this and sync off main thread? but the calls to e.g. getBalance() fail with 'wallet and network is not yet initialized' HavenoUtils.havenoSetup.getWalletInitialized().set(true); + + // save but skip backup on initialization saveMainWallet(false); - - // reschedule to init main wallet - UserThread.runAfter(() -> { - ThreadUtils.execute(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID); - }, xmrConnectionService.getRefreshPeriodMs() / 1000); - } else { - log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000); - UserThread.runAfter(() -> { - ThreadUtils.execute(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID); - }, xmrConnectionService.getRefreshPeriodMs() / 1000); + } catch (Exception e) { + if (isClosingWallet || isShutDownStarted || HavenoUtils.havenoSetup.getWalletInitialized().get()) return; // ignore if wallet closing, shut down started, or app already initialized + log.warn("Error initially syncing main wallet: {}", e.getMessage()); + if (numAttempts <= 1) { + log.warn("Failed to sync main wallet. Opening app without syncing", numAttempts); + HavenoUtils.havenoSetup.getWalletInitialized().set(true); + saveMainWallet(false); + + // reschedule to init main wallet + UserThread.runAfter(() -> { + maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS); + }, xmrConnectionService.getRefreshPeriodMs() / 1000); + } else { + log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000); + UserThread.runAfter(() -> { + maybeInitMainWallet(true, numAttempts - 1); + }, xmrConnectionService.getRefreshPeriodMs() / 1000); + } } } + + // start polling main wallet + startPolling(); } - - // start polling main wallet - startPolling(); } - } + }, THREAD_ID); } private void resetIfWalletChanged() { @@ -1431,6 +1419,9 @@ public class XmrWalletService { private void syncWithProgress() { + // start sync progress timeout + resetSyncProgressTimeout(); + // show sync progress updateSyncProgress(wallet.getHeight()); @@ -1458,8 +1449,8 @@ public class XmrWalletService { // poll wallet for progress wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs()); - syncWithProgressLatch = new CountDownLatch(1); - syncWithProgressLooper = new TaskLooper(() -> { + syncProgressLatch = new CountDownLatch(1); + syncProgressLooper = new TaskLooper(() -> { if (wallet == null) return; long height = 0; try { @@ -1470,29 +1461,22 @@ public class XmrWalletService { } if (height < xmrConnectionService.getTargetHeight()) updateSyncProgress(height); else { - syncWithProgressLooper.stop(); + syncProgressLooper.stop(); wasWalletSynced = true; updateSyncProgress(height); - syncWithProgressLatch.countDown(); + syncProgressLatch.countDown(); } }); - syncWithProgressLooper.start(1000); - HavenoUtils.awaitLatch(syncWithProgressLatch); + syncProgressLooper.start(1000); + HavenoUtils.awaitLatch(syncProgressLatch); wallet.stopSyncing(); if (!wasWalletSynced) throw new IllegalStateException("Failed to sync wallet with progress"); } - private void stopSyncWithProgress() { - if (syncWithProgressLooper != null) { - syncWithProgressLooper.stop(); - syncWithProgressLooper = null; - syncWithProgressLatch.countDown(); - } - } - private void updateSyncProgress(long height) { UserThread.execute(() -> { walletHeight.set(height); + resetSyncProgressTimeout(); // new wallet reports height 1 before synced if (height == 1) { @@ -1509,6 +1493,18 @@ public class XmrWalletService { }); } + private synchronized void resetSyncProgressTimeout() { + if (syncProgressTimeout != null) syncProgressTimeout.stop(); + syncProgressTimeout = UserThread.runAfter(() -> { + if (isShutDownStarted || wasWalletSynced) return; + log.warn("Sync progress timeout called"); + forceCloseMainWallet(); + requestSwitchToNextBestConnection(); + maybeInitMainWallet(true); + resetSyncProgressTimeout(); + }, SYNC_PROGRESS_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + private MoneroWalletFull createWalletFull(MoneroWalletConfig config) { // must be connected to daemon @@ -1545,7 +1541,7 @@ public class XmrWalletService { // open wallet config.setNetworkType(getMoneroNetworkType()); config.setServer(connection); - log.info("Opening full wallet " + config.getPath() + " with monerod=" + connection.getUri()); + log.info("Opening full wallet " + config.getPath() + " with monerod=" + connection.getUri() + ", proxyUri=" + connection.getProxyUri()); walletFull = MoneroWalletFull.openWallet(config); if (walletFull.getDaemonConnection() != null) walletFull.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE); log.info("Done opening full wallet " + config.getPath()); @@ -1605,7 +1601,7 @@ public class XmrWalletService { if (!applyProxyUri) connection.setProxyUri(null); // open wallet - log.info("Opening RPC wallet " + config.getPath() + " with monerod=" + connection.getUri()); + log.info("Opening RPC wallet " + config.getPath() + " with monerod=" + connection.getUri() + ", proxyUri=" + connection.getProxyUri()); config.setServer(connection); walletRpc.openWallet(config); if (walletRpc.getDaemonConnection() != null) walletRpc.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE); @@ -1667,20 +1663,37 @@ public class XmrWalletService { String oldProxyUri = wallet == null || wallet.getDaemonConnection() == null ? null : wallet.getDaemonConnection().getProxyUri(); String newProxyUri = connection == null ? null : connection.getProxyUri(); log.info("Setting daemon connection for main wallet: uri={}, proxyUri={}", connection == null ? null : connection.getUri(), newProxyUri); + + // force restart main wallet if connection changed before synced + if (!wasWalletSynced) { + if (!Boolean.TRUE.equals(xmrConnectionService.isConnected())) return; + log.warn("Force restarting main wallet because connection changed before inital sync"); + forceRestartMainWallet(); + return; + } + + // update connection if (wallet instanceof MoneroWalletRpc) { if (StringUtils.equals(oldProxyUri, newProxyUri)) { wallet.setDaemonConnection(connection); } else { - log.info("Restarting main wallet because proxy URI has changed, old={}, new={}", oldProxyUri, newProxyUri); + log.info("Restarting main wallet because proxy URI has changed, old={}, new={}", oldProxyUri, newProxyUri); // TODO: set proxy without restarting wallet closeMainWallet(true); maybeInitMainWallet(false); + return; // wallet is re-initialized } } else { wallet.setDaemonConnection(connection); wallet.setProxyUri(connection.getProxyUri()); } - // sync wallet on new thread + // switch if wallet disconnected + if (Boolean.TRUE.equals(connection.isConnected() && !wallet.isConnectedToDaemon())) { + log.warn("Switching to next best connection because main wallet is disconnected"); + if (requestSwitchToNextBestConnection()) return; // calls back to this method + } + + // update poll period if (connection != null && !isShutDownStarted) { wallet.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE); updatePollPeriod(); @@ -1735,25 +1748,21 @@ public class XmrWalletService { } private void forceCloseMainWallet() { + stopPolling(); isClosingWallet = true; forceCloseWallet(wallet, getWalletPath(MONERO_WALLET_NAME)); - stopPolling(); - stopSyncWithProgress(); wallet = null; } private void forceRestartMainWallet() { log.warn("Force restarting main wallet"); forceCloseMainWallet(); - synchronized (WALLET_LOCK) { - maybeInitMainWallet(true); - } + maybeInitMainWallet(true); } private void startPolling() { synchronized (WALLET_LOCK) { - if (isShutDownStarted || isPollInProgress()) return; - log.info("Starting to poll main wallet"); + if (isShutDownStarted || isPolling()) return; updatePollPeriod(); pollLooper = new TaskLooper(() -> pollWallet()); pollLooper.start(pollPeriodMs); @@ -1761,13 +1770,13 @@ public class XmrWalletService { } private void stopPolling() { - if (isPollInProgress()) { + if (isPolling()) { pollLooper.stop(); pollLooper = null; } } - private boolean isPollInProgress() { + private boolean isPolling() { return pollLooper != null; } @@ -1785,7 +1794,7 @@ public class XmrWalletService { if (this.isShutDownStarted) return; if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return; this.pollPeriodMs = pollPeriodMs; - if (isPollInProgress()) { + if (isPolling()) { stopPolling(); startPolling(); } @@ -1793,69 +1802,73 @@ public class XmrWalletService { } private void pollWallet() { - if (pollInProgress) return; + synchronized (pollLock) { + if (pollInProgress) return; + } doPollWallet(true); } private void doPollWallet(boolean updateTxs) { synchronized (pollLock) { - if (isShutDownStarted) return; pollInProgress = true; - try { + } + if (isShutDownStarted) return; + try { - // skip if daemon not synced - MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo(); - if (lastInfo == null) { - log.warn("Last daemon info is null"); - return; - } - if (!xmrConnectionService.isSyncedWithinTolerance()) { - log.warn("Monero daemon is not synced within tolerance, height={}, targetHeight={}", xmrConnectionService.chainHeightProperty().get(), xmrConnectionService.getTargetHeight()); - return; - } + // skip if daemon not synced + MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo(); + if (lastInfo == null) { + log.warn("Last daemon info is null"); + return; + } + if (!xmrConnectionService.isSyncedWithinTolerance()) { + log.warn("Monero daemon is not synced within tolerance, height={}, targetHeight={}", xmrConnectionService.chainHeightProperty().get(), xmrConnectionService.getTargetHeight()); + return; + } - // switch to best connection if wallet is too far behind - if (wasWalletSynced && walletHeight.get() < xmrConnectionService.getTargetHeight() - NUM_BLOCKS_BEHIND_TOLERANCE && !Config.baseCurrencyNetwork().isTestnet()) { - log.warn("Updating connection because main wallet is {} blocks behind monerod, wallet height={}, monerod height={}", xmrConnectionService.getTargetHeight() - walletHeight.get(), walletHeight.get(), lastInfo.getHeight()); - xmrConnectionService.switchToBestConnection(); - } + // switch to best connection if wallet is too far behind + if (wasWalletSynced && walletHeight.get() < xmrConnectionService.getTargetHeight() - NUM_BLOCKS_BEHIND_TOLERANCE && !Config.baseCurrencyNetwork().isTestnet()) { + log.warn("Updating connection because main wallet is {} blocks behind monerod, wallet height={}, monerod height={}", xmrConnectionService.getTargetHeight() - walletHeight.get(), walletHeight.get(), lastInfo.getHeight()); + if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection(); + } - // sync wallet if behind daemon - if (walletHeight.get() < xmrConnectionService.getTargetHeight()) { - synchronized (WALLET_LOCK) { // avoid long sync from blocking other operations - syncMainWallet(); - } + // sync wallet if behind daemon + if (walletHeight.get() < xmrConnectionService.getTargetHeight()) { + synchronized (WALLET_LOCK) { // avoid long sync from blocking other operations + syncMainWallet(); } + } - // fetch transactions from pool and store to cache - // TODO: ideally wallet should sync every poll and then avoid updating from pool on fetching txs? - if (updateTxs) { - synchronized (WALLET_LOCK) { // avoid long fetch from blocking other operations - synchronized (HavenoUtils.getDaemonLock()) { - try { - cachedTxs = wallet.getTxs(new MoneroTxQuery().setIncludeOutputs(true)); - lastPollSuccessTimestamp = System.currentTimeMillis(); - } catch (Exception e) { // fetch from pool can fail - if (!isShutDownStarted) { - if (lastPollSuccessTimestamp == null || System.currentTimeMillis() - lastPollSuccessTimestamp > LOG_POLL_ERROR_AFTER_MS) { // only log if not recently successful - log.warn("Error polling main wallet's transactions from the pool: {}", e.getMessage()); - } + // fetch transactions from pool and store to cache + // TODO: ideally wallet should sync every poll and then avoid updating from pool on fetching txs? + if (updateTxs) { + synchronized (WALLET_LOCK) { // avoid long fetch from blocking other operations + synchronized (HavenoUtils.getDaemonLock()) { + try { + cachedTxs = wallet.getTxs(new MoneroTxQuery().setIncludeOutputs(true)); + lastPollSuccessTimestamp = System.currentTimeMillis(); + } catch (Exception e) { // fetch from pool can fail + if (!isShutDownStarted) { + if (lastPollSuccessTimestamp == null || System.currentTimeMillis() - lastPollSuccessTimestamp > LOG_POLL_ERROR_AFTER_MS) { // only log if not recently successful + log.warn("Error polling main wallet's transactions from the pool: {}", e.getMessage()); } } } } } - } catch (Exception e) { - if (wallet == null || isShutDownStarted) return; - boolean isConnectionRefused = e.getMessage() != null && e.getMessage().contains("Connection refused"); - if (isConnectionRefused) forceRestartMainWallet(); - else if (isWalletConnectedToDaemon()) { - log.warn("Error polling main wallet, errorMessage={}. Monerod={}", e.getMessage(), getConnectionService().getConnection()); - //e.printStackTrace(); - } - } finally { + } + } catch (Exception e) { + if (wallet == null || isShutDownStarted) return; + boolean isConnectionRefused = e.getMessage() != null && e.getMessage().contains("Connection refused"); + if (isConnectionRefused) forceRestartMainWallet(); + else if (isWalletConnectedToDaemon()) { + log.warn("Error polling main wallet, errorMessage={}. Monerod={}", e.getMessage(), getConnectionService().getConnection()); + //e.printStackTrace(); + } + } finally { - // cache wallet info last + // cache wallet info last + synchronized (WALLET_LOCK) { if (wallet != null && !isShutDownStarted) { try { cacheWalletInfo(); @@ -1863,6 +1876,9 @@ public class XmrWalletService { e.printStackTrace(); } } + } + + synchronized (pollLock) { pollInProgress = false; } } @@ -1887,6 +1903,10 @@ public class XmrWalletService { } } + public boolean requestSwitchToNextBestConnection() { + return xmrConnectionService.requestSwitchToNextBestConnection(); + } + private void onNewBlock(long height) { UserThread.execute(() -> { walletHeight.set(height); diff --git a/desktop/src/main/java/haveno/desktop/main/funds/withdrawal/WithdrawalView.java b/desktop/src/main/java/haveno/desktop/main/funds/withdrawal/WithdrawalView.java index 44a66881..cab5d715 100644 --- a/desktop/src/main/java/haveno/desktop/main/funds/withdrawal/WithdrawalView.java +++ b/desktop/src/main/java/haveno/desktop/main/funds/withdrawal/WithdrawalView.java @@ -270,6 +270,7 @@ public class WithdrawalView extends ActivatableView { if (isNotEnoughMoney(e.getMessage())) throw e; log.warn("Error creating creating withdraw tx, attempt={}/{}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage()); if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e; + if (xmrWalletService.getConnectionService().isConnected()) xmrWalletService.requestSwitchToNextBestConnection(); HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying } }