switch to next best monerod on various errors

This commit is contained in:
woodser 2024-07-17 09:56:22 -04:00
parent 33bf54bcac
commit 06b0c20bad
11 changed files with 677 additions and 459 deletions

View file

@ -47,6 +47,7 @@ public class ThreadUtils {
synchronized (THREADS) {
THREADS.put(threadId, Thread.currentThread());
}
Thread.currentThread().setName(threadId);
command.run();
});
}

View file

@ -36,7 +36,10 @@ import haveno.network.Socks5ProxyProvider;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.P2PServiceListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.LongProperty;
@ -103,6 +106,12 @@ public final class XmrConnectionService {
private boolean isShutDownStarted;
private List<MoneroConnectionManagerListener> listeners = new ArrayList<>();
// connection switching
private static final int EXCLUDE_CONNECTION_SECONDS = 300;
private static final int SKIP_SWITCH_WITHIN_MS = 60000;
private Set<MoneroRpcConnection> excludedConnections = new HashSet<>();
private long lastSwitchRequestTimestamp;
@Inject
public XmrConnectionService(P2PService p2PService,
Config config,
@ -201,12 +210,6 @@ public final class XmrConnectionService {
return connectionManager.getConnections();
}
public void switchToBestConnection() {
if (isFixedConnection() || !connectionManager.getAutoSwitch()) return;
MoneroRpcConnection bestConnection = getBestAvailableConnection();
if (bestConnection != null) setConnection(bestConnection);
}
public void setConnection(String connectionUri) {
accountService.checkAccountOpen();
connectionManager.setConnection(connectionUri); // listener will update connection list
@ -244,10 +247,67 @@ public final class XmrConnectionService {
public MoneroRpcConnection getBestAvailableConnection() {
accountService.checkAccountOpen();
List<MoneroRpcConnection> ignoredConnections = new ArrayList<MoneroRpcConnection>();
if (xmrLocalNode.shouldBeIgnored() && connectionManager.hasConnection(xmrLocalNode.getUri())) ignoredConnections.add(connectionManager.getConnectionByUri(xmrLocalNode.getUri()));
addLocalNodeIfIgnored(ignoredConnections);
return connectionManager.getBestAvailableConnection(ignoredConnections.toArray(new MoneroRpcConnection[0]));
}
private MoneroRpcConnection getBestAvailableConnection(Collection<MoneroRpcConnection> ignoredConnections) {
accountService.checkAccountOpen();
Set<MoneroRpcConnection> ignoredConnectionsSet = new HashSet<>(ignoredConnections);
addLocalNodeIfIgnored(ignoredConnectionsSet);
return connectionManager.getBestAvailableConnection(ignoredConnectionsSet.toArray(new MoneroRpcConnection[0]));
}
private void addLocalNodeIfIgnored(Collection<MoneroRpcConnection> ignoredConnections) {
if (xmrLocalNode.shouldBeIgnored() && connectionManager.hasConnection(xmrLocalNode.getUri())) ignoredConnections.add(connectionManager.getConnectionByUri(xmrLocalNode.getUri()));
}
private void switchToBestConnection() {
if (isFixedConnection() || !connectionManager.getAutoSwitch()) {
log.info("Skipping switch to best Monero connection because connection is fixed or auto switch is disabled");
return;
}
MoneroRpcConnection bestConnection = getBestAvailableConnection();
if (bestConnection != null) setConnection(bestConnection);
}
public boolean requestSwitchToNextBestConnection() {
log.warn("Request made to switch to next best monerod, current monerod={}", getConnection() == null ? null : getConnection().getUri());
// skip if connection is fixed
if (isFixedConnection() || !connectionManager.getAutoSwitch()) {
log.info("Skipping switch to next best Monero connection because connection is fixed or auto switch is disabled");
return false;
}
// skip if last switch was too recent
boolean skipSwitch = System.currentTimeMillis() - lastSwitchRequestTimestamp < SKIP_SWITCH_WITHIN_MS;
lastSwitchRequestTimestamp = System.currentTimeMillis();
if (skipSwitch) {
log.warn("Skipping switch to next best Monero connection because last switch was less than {} seconds ago", SKIP_SWITCH_WITHIN_MS / 1000);
lastSwitchRequestTimestamp = System.currentTimeMillis();
return false;
}
// try to get connection to switch to
MoneroRpcConnection currentConnection = getConnection();
if (currentConnection != null) excludedConnections.add(currentConnection);
MoneroRpcConnection bestConnection = getBestAvailableConnection(excludedConnections);
// remove from excluded connections after period
UserThread.runAfter(() -> {
if (currentConnection != null) excludedConnections.remove(currentConnection);
}, EXCLUDE_CONNECTION_SECONDS);
// switch to best connection
if (bestConnection == null) {
log.warn("Could not get connection to switch to");
return false;
}
setConnection(bestConnection);
return true;
}
public void setAutoSwitch(boolean autoSwitch) {
accountService.checkAccountOpen();
connectionManager.setAutoSwitch(autoSwitch);
@ -505,7 +565,6 @@ public final class XmrConnectionService {
// register connection listener
connectionManager.addListener(this::onConnectionChanged);
isInitialized = true;
}

View file

@ -1057,6 +1057,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} catch (Exception e) {
log.warn("Error creating split output tx to fund offer {} at subaddress {}, attempt={}/{}, error={}", openOffer.getShortId(), entry.getSubaddressIndex(), i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage());
if (stopped || i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrConnectionService.isConnected()) xmrWalletService.requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}

View file

@ -89,6 +89,7 @@ public class MakerReserveOfferFunds extends Task<PlaceOfferModel> {
log.warn("Error creating reserve tx, attempt={}/{}, offerId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, openOffer.getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
model.getProtocol().startTimeoutTimer(); // reset protocol timeout
if (model.getXmrWalletService().getConnectionService().isConnected()) model.getXmrWalletService().requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}

View file

@ -478,6 +478,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
if (trade.isPayoutPublished()) throw new IllegalStateException("Payout tx already published for " + trade.getClass().getSimpleName() + " " + trade.getShortId());
log.warn("Failed to submit dispute payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, trade.getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (trade.getXmrConnectionService().isConnected()) trade.requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}

View file

@ -141,6 +141,7 @@ public abstract class Trade implements Tradable, Model {
private static final long SHUTDOWN_TIMEOUT_MS = 60000;
private static final long SYNC_EVERY_NUM_BLOCKS = 360; // ~1/2 day
private static final long DELETE_AFTER_NUM_BLOCKS = 2; // if deposit requested but not published
private static final long EXTENDED_RPC_TIMEOUT = 600000; // 10 minutes
private static final long DELETE_AFTER_MS = TradeProtocol.TRADE_STEP_TIMEOUT_SECONDS;
private final Object walletLock = new Object();
private final Object pollLock = new Object();
@ -626,10 +627,8 @@ public abstract class Trade implements Tradable, Model {
// handle connection change on dedicated thread
xmrConnectionService.addConnectionListener(connection -> {
ThreadUtils.submitToPool(() -> { // TODO: remove this?
ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId());
});
});
// reset buyer's payment sent state if no ack receive
if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) {
@ -847,6 +846,14 @@ public abstract class Trade implements Tradable, Model {
}
}
public boolean requestSwitchToNextBestConnection() {
if (xmrConnectionService.requestSwitchToNextBestConnection()) {
onConnectionChanged(xmrConnectionService.getConnection()); // change connection on same thread
return true;
}
return false;
}
public boolean isIdling() {
return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && pollNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden
}
@ -884,69 +891,8 @@ public abstract class Trade implements Tradable, Model {
}).start();
}
public void importMultisigHex() {
synchronized (walletLock) {
synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
doImportMultisigHex();
break;
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
log.warn("Failed to import multisig hex, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}
}
}
}
private void doImportMultisigHex() {
// ensure wallet sees deposits confirmed
if (!isDepositsConfirmed()) syncAndPollWallet();
// collect multisig hex from peers
List<String> multisigHexes = new ArrayList<String>();
for (TradePeer peer : getOtherPeers()) if (peer.getUpdatedMultisigHex() != null) multisigHexes.add(peer.getUpdatedMultisigHex());
// import multisig hex
log.info("Importing multisig hexes for {} {}, count={}", getClass().getSimpleName(), getShortId(), multisigHexes.size());
long startTime = System.currentTimeMillis();
if (!multisigHexes.isEmpty()) {
try {
wallet.importMultisigHex(multisigHexes.toArray(new String[0]));
} catch (MoneroError e) {
// import multisig hex individually if one is invalid
if (isInvalidImportError(e.getMessage())) {
log.warn("Peer has invalid multisig hex for {} {}, importing individually", getClass().getSimpleName(), getShortId());
boolean imported = false;
Exception lastError = null;
for (TradePeer peer : getOtherPeers()) {
if (peer.getUpdatedMultisigHex() == null) continue;
try {
wallet.importMultisigHex(peer.getUpdatedMultisigHex());
imported = true;
} catch (MoneroError e2) {
lastError = e2;
if (isInvalidImportError(e2.getMessage())) {
log.warn("{} has invalid multisig hex for {} {}, error={}, multisigHex={}", getPeerRole(peer), getClass().getSimpleName(), getShortId(), e2.getMessage(), peer.getUpdatedMultisigHex());
} else {
throw e2;
}
}
}
if (!imported) throw new IllegalArgumentException("Could not import any multisig hexes for " + getClass().getSimpleName() + " " + getShortId(), lastError);
} else {
throw e;
}
}
requestSaveWallet();
}
log.info("Done importing multisig hexes for {} {} in {} ms, count={}", getClass().getSimpleName(), getShortId(), System.currentTimeMillis() - startTime, multisigHexes.size());
private boolean isReadTimeoutError(String errMsg) {
return errMsg.contains("Read timed out");
}
// TODO: checking error strings isn't robust, but the library doesn't provide a way to check if multisig hex is invalid. throw IllegalArgumentException from library on invalid multisig hex?
@ -962,7 +908,13 @@ public abstract class Trade implements Tradable, Model {
}
public void requestSaveWallet() {
ThreadUtils.submitToPool(() -> saveWallet()); // save wallet off main thread
// save wallet off main thread
ThreadUtils.execute(() -> {
synchronized (walletLock) {
if (walletExists()) saveWallet();
}
}, getId());
}
public void saveWallet() {
@ -1109,6 +1061,104 @@ public abstract class Trade implements Tradable, Model {
}
}
public void importMultisigHex() {
synchronized (walletLock) {
synchronized (HavenoUtils.getDaemonLock()) { // lock on daemon because import calls full refresh
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
doImportMultisigHex();
break;
} catch (IllegalArgumentException | IllegalStateException e) {
throw e;
} catch (Exception e) {
log.warn("Failed to import multisig hex, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection();
if (isReadTimeoutError(e.getMessage())) forceRestartTradeWallet(); // wallet can be stuck a while
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}
}
}
}
private void doImportMultisigHex() {
// ensure wallet sees deposits confirmed
if (!isDepositsConfirmed()) syncAndPollWallet();
// collect multisig hex from peers
List<String> multisigHexes = new ArrayList<String>();
for (TradePeer peer : getOtherPeers()) if (peer.getUpdatedMultisigHex() != null) multisigHexes.add(peer.getUpdatedMultisigHex());
// import multisig hex
log.info("Importing multisig hexes for {} {}, count={}", getClass().getSimpleName(), getShortId(), multisigHexes.size());
long startTime = System.currentTimeMillis();
if (!multisigHexes.isEmpty()) {
try {
wallet.importMultisigHex(multisigHexes.toArray(new String[0]));
// check if import is still needed // TODO: we once received a multisig hex which was too short, causing import to still be needed
if (wallet.isMultisigImportNeeded()) {
String errorMessage = "Multisig import still needed for " + getClass().getSimpleName() + " " + getShortId() + " after already importing, multisigHexes=" + multisigHexes;
log.warn(errorMessage);
// ignore multisig hex which is significantly shorter than others
int maxLength = 0;
boolean removed = false;
for (String hex : multisigHexes) maxLength = Math.max(maxLength, hex.length());
for (String hex : new ArrayList<>(multisigHexes)) {
if (hex.length() < maxLength / 2) {
String ignoringMessage = "Ignoring multisig hex from " + getMultisigHexRole(hex) + " for " + getClass().getSimpleName() + " " + getShortId() + " because it is too short, multisigHex=" + hex;
setErrorMessage(ignoringMessage);
log.warn(ignoringMessage);
multisigHexes.remove(hex);
removed = true;
}
}
// re-import valid multisig hexes
if (removed) wallet.importMultisigHex(multisigHexes.toArray(new String[0]));
if (wallet.isMultisigImportNeeded()) throw new IllegalStateException(errorMessage);
}
} catch (MoneroError e) {
// import multisig hex individually if one is invalid
if (isInvalidImportError(e.getMessage())) {
log.warn("Peer has invalid multisig hex for {} {}, importing individually", getClass().getSimpleName(), getShortId());
boolean imported = false;
Exception lastError = null;
for (TradePeer peer : getOtherPeers()) {
if (peer.getUpdatedMultisigHex() == null) continue;
try {
wallet.importMultisigHex(peer.getUpdatedMultisigHex());
imported = true;
} catch (MoneroError e2) {
lastError = e2;
if (isInvalidImportError(e2.getMessage())) {
log.warn("{} has invalid multisig hex for {} {}, error={}, multisigHex={}", getPeerRole(peer), getClass().getSimpleName(), getShortId(), e2.getMessage(), peer.getUpdatedMultisigHex());
} else {
throw e2;
}
}
}
if (!imported) throw new IllegalArgumentException("Could not import any multisig hexes for " + getClass().getSimpleName() + " " + getShortId(), lastError);
} else {
throw e;
}
}
requestSaveWallet();
}
log.info("Done importing multisig hexes for {} {} in {} ms, count={}", getClass().getSimpleName(), getShortId(), System.currentTimeMillis() - startTime, multisigHexes.size());
}
private String getMultisigHexRole(String multisigHex) {
if (multisigHex.equals(getArbitrator().getUpdatedMultisigHex())) return "arbitrator";
if (multisigHex.equals(getBuyer().getUpdatedMultisigHex())) return "buyer";
if (multisigHex.equals(getSeller().getUpdatedMultisigHex())) return "seller";
throw new IllegalArgumentException("Multisig hex does not belong to any peer");
}
/**
* Create the payout tx.
*
@ -1125,9 +1175,12 @@ public abstract class Trade implements Tradable, Model {
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
return doCreatePayoutTx();
} catch (IllegalArgumentException | IllegalStateException e) {
throw e;
} catch (Exception e) {
log.warn("Failed to create payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}
@ -1139,14 +1192,10 @@ public abstract class Trade implements Tradable, Model {
private MoneroTxWallet doCreatePayoutTx() {
// check if multisig import needed
if (wallet.isMultisigImportNeeded()) throw new RuntimeException("Cannot create payout tx because multisig import is needed");
if (wallet.isMultisigImportNeeded()) throw new IllegalStateException("Cannot create payout tx because multisig import is needed for " + getClass().getSimpleName() + " " + getShortId());
// TODO: wallet sometimes returns empty data, after disconnect?
List<MoneroTxWallet> txs = wallet.getTxs(); // TODO: this fetches from pool
if (txs.isEmpty()) {
log.warn("Restarting wallet for {} {} because deposit txs are missing to create payout tx", getClass().getSimpleName(), getId());
forceRestartTradeWallet();
}
// recover if missing wallet data
recoverIfMissingWalletData();
// gather info
String sellerPayoutAddress = getSeller().getPayoutAddressString();
@ -1184,11 +1233,15 @@ public abstract class Trade implements Tradable, Model {
synchronized (HavenoUtils.getWalletFunctionLock()) {
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
if (wallet.isMultisigImportNeeded()) throw new IllegalStateException("Cannot create dispute payout tx because multisig import is needed for " + getClass().getSimpleName() + " " + getShortId());
return createTx(txConfig);
} catch (IllegalArgumentException | IllegalStateException e) {
throw e;
} catch (Exception e) {
if (e.getMessage().contains("not possible")) throw new RuntimeException("Loser payout is too small to cover the mining fee");
if (e.getMessage().contains("not possible")) throw new IllegalArgumentException("Loser payout is too small to cover the mining fee");
log.warn("Failed to create dispute payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}
@ -1205,23 +1258,30 @@ public abstract class Trade implements Tradable, Model {
* @param publish publishes the signed payout tx if true
*/
public void processPayoutTx(String payoutTxHex, boolean sign, boolean publish) {
log.info("Processing payout tx for {} {}", getClass().getSimpleName(), getId());
// TODO: wallet sometimes returns empty data, after disconnect? detect this condition with failure tolerance
synchronized (walletLock) {
synchronized (HavenoUtils.getWalletFunctionLock()) {
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
List<MoneroTxWallet> txs = wallet.getTxs(); // TODO: this fetches from pool
if (txs.isEmpty()) {
log.warn("Restarting wallet for {} {} because deposit txs are missing to process payout tx", getClass().getSimpleName(), getId());
forceRestartTradeWallet();
}
doProcessPayoutTx(payoutTxHex, sign, publish);
break;
} catch (IllegalArgumentException | IllegalStateException e) {
throw e;
} catch (Exception e) {
log.warn("Failed get wallet txs, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
log.warn("Failed to process payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}
}
}
}
private void doProcessPayoutTx(String payoutTxHex, boolean sign, boolean publish) {
log.info("Processing payout tx for {} {}", getClass().getSimpleName(), getId());
// recover if missing wallet data
recoverIfMissingWalletData();
// gather relevant info
MoneroWallet wallet = getWallet();
@ -1234,6 +1294,7 @@ public abstract class Trade implements Tradable, Model {
MoneroTxSet describedTxSet = wallet.describeTxSet(new MoneroTxSet().setMultisigTxHex(payoutTxHex));
if (describedTxSet.getTxs() == null || describedTxSet.getTxs().size() != 1) throw new IllegalArgumentException("Bad payout tx"); // TODO (woodser): test nack
MoneroTxWallet payoutTx = describedTxSet.getTxs().get(0);
if (payoutTxId == null) updatePayout(payoutTx); // update payout tx if not signed
// verify payout tx has exactly 2 destinations
if (payoutTx.getOutgoingTransfer() == null || payoutTx.getOutgoingTransfer().getDestinations() == null || payoutTx.getOutgoingTransfer().getDestinations().size() != 2) throw new IllegalArgumentException("Payout tx does not have exactly two destinations");
@ -1265,10 +1326,11 @@ public abstract class Trade implements Tradable, Model {
if (!sellerPayoutDestination.getAmount().equals(expectedSellerPayout)) throw new IllegalArgumentException("Seller destination amount is not deposit amount - trade amount - 1/2 tx costs, " + sellerPayoutDestination.getAmount() + " vs " + expectedSellerPayout);
// check connection
if (sign || publish) verifyDaemonConnection();
boolean doSign = sign && getPayoutTxHex() == null;
if (doSign || publish) verifyDaemonConnection();
// handle tx signing
if (sign) {
if (doSign) {
// sign tx
try {
@ -1283,6 +1345,7 @@ public abstract class Trade implements Tradable, Model {
// describe result
describedTxSet = wallet.describeMultisigTxSet(payoutTxHex);
payoutTx = describedTxSet.getTxs().get(0);
updatePayout(payoutTx);
// verify fee is within tolerance by recreating payout tx
// TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated?
@ -1294,22 +1357,16 @@ public abstract class Trade implements Tradable, Model {
log.info("Payout tx fee {} is within tolerance, diff %={}", payoutTx.getFee(), feeDiff);
}
// update trade state
updatePayout(payoutTx);
// save trade state
requestPersistence();
// submit payout tx
if (publish) {
for (int i = 0; i < TradeProtocol.MAX_ATTEMPTS; i++) {
try {
wallet.submitMultisigTxHex(payoutTxHex);
ThreadUtils.submitToPool(() -> pollWallet());
break;
setPayoutStatePublished();
} catch (Exception e) {
log.warn("Failed to submit payout tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
throw new RuntimeException("Failed to submit payout tx for " + getClass().getSimpleName() + " " + getId(), e);
}
}
}
@ -2244,10 +2301,6 @@ public abstract class Trade implements Tradable, Model {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private String getConnectionChangedThreadId() {
return getId() + ".onConnectionChanged";
}
// lazy initialization
private ObjectProperty<BigInteger> getAmountProperty() {
if (tradeAmountProperty == null)
@ -2263,6 +2316,10 @@ public abstract class Trade implements Tradable, Model {
return tradeVolumeProperty;
}
private String getConnectionChangedThreadId() {
return getId() + ".onConnectionChanged";
}
private void onConnectionChanged(MoneroRpcConnection connection) {
synchronized (walletLock) {
@ -2355,11 +2412,11 @@ public abstract class Trade implements Tradable, Model {
}
private void setPollPeriod(long pollPeriodMs) {
synchronized (walletLock) {
synchronized (pollLock) {
if (this.isShutDownStarted) return;
if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return;
this.pollPeriodMs = pollPeriodMs;
if (isPollInProgress()) {
if (isPolling()) {
stopPolling();
startPolling();
}
@ -2372,8 +2429,8 @@ public abstract class Trade implements Tradable, Model {
}
private void startPolling() {
synchronized (walletLock) {
if (isShutDownStarted || isPollInProgress()) return;
synchronized (pollLock) {
if (isShutDownStarted || isPolling()) return;
updatePollPeriod();
log.info("Starting to poll wallet for {} {}", getClass().getSimpleName(), getId());
pollLooper = new TaskLooper(() -> pollWallet());
@ -2382,28 +2439,32 @@ public abstract class Trade implements Tradable, Model {
}
private void stopPolling() {
synchronized (walletLock) {
if (isPollInProgress()) {
synchronized (pollLock) {
if (isPolling()) {
pollLooper.stop();
pollLooper = null;
}
}
}
private boolean isPollInProgress() {
synchronized (walletLock) {
private boolean isPolling() {
synchronized (pollLock) {
return pollLooper != null;
}
}
private void pollWallet() {
synchronized (pollLock) {
if (pollInProgress) return;
}
doPollWallet();
}
private void doPollWallet() {
if (isShutDownStarted) return;
synchronized (pollLock) {
pollInProgress = true;
}
try {
// skip if payout unlocked
@ -2469,11 +2530,7 @@ public abstract class Trade implements Tradable, Model {
// rescan spent outputs to detect unconfirmed payout tx
if (isPayoutExpected && wallet.getBalance().compareTo(BigInteger.ZERO) > 0) {
try {
wallet.rescanSpent();
} catch (Exception e) {
log.warn("Error rescanning spent outputs to detect payout tx for {} {}, errorMessage={}", getClass().getSimpleName(), getShortId(), e.getMessage());
}
}
// get txs from trade wallet
@ -2523,12 +2580,15 @@ public abstract class Trade implements Tradable, Model {
boolean isWalletConnected = isWalletConnectedToDaemon();
if (!isShutDownStarted && wallet != null && isWalletConnected) {
log.warn("Error polling trade wallet for {} {}, errorMessage={}. Monerod={}", getClass().getSimpleName(), getShortId(), e.getMessage(), getXmrWalletService().getConnectionService().getConnection());
requestSwitchToNextBestConnection();
//e.printStackTrace();
}
}
} finally {
synchronized (pollLock) {
pollInProgress = false;
}
requestSaveWallet();
}
}
@ -2553,6 +2613,70 @@ public abstract class Trade implements Tradable, Model {
depositTxsUpdateCounter.set(depositTxsUpdateCounter.get() + 1);
}
// TODO: wallet is sometimes missing balance or deposits, due to specific daemon connections, not saving?
private void recoverIfMissingWalletData() {
synchronized (walletLock) {
if (isWalletMissingData()) {
log.warn("Wallet is missing data for {} {}, attempting to recover", getClass().getSimpleName(), getShortId());
// force restart wallet
forceRestartTradeWallet();
// rescan blockchain with global daemon lock
synchronized (HavenoUtils.getDaemonLock()) {
Long timeout = null;
try {
// extend rpc timeout for rescan
if (wallet instanceof MoneroWalletRpc) {
timeout = ((MoneroWalletRpc) wallet).getRpcConnection().getTimeout();
((MoneroWalletRpc) wallet).getRpcConnection().setTimeout(EXTENDED_RPC_TIMEOUT);
}
// rescan blockchain
log.warn("Rescanning blockchain for {} {}", getClass().getSimpleName(), getShortId());
wallet.rescanBlockchain();
} catch (Exception e) {
if (isReadTimeoutError(e.getMessage())) forceRestartTradeWallet(); // wallet can be stuck a while
throw e;
} finally {
// restore rpc timeout
if (wallet instanceof MoneroWalletRpc) {
((MoneroWalletRpc) wallet).getRpcConnection().setTimeout(timeout);
}
}
}
// import multisig hex
log.warn("Importing multisig hex to recover wallet data for {} {}", getClass().getSimpleName(), getShortId());
importMultisigHex();
}
}
// check again after releasing lock
if (isWalletMissingData()) throw new IllegalStateException("Wallet is still missing data after attempting recovery for " + getClass().getSimpleName() + " " + getShortId());
}
private boolean isWalletMissingData() {
synchronized (walletLock) {
if (!isDepositsUnlocked() || isPayoutPublished()) return false;
if (getMakerDepositTx() == null) {
log.warn("Missing maker deposit tx for {} {}", getClass().getSimpleName(), getId());
return true;
}
if (getTakerDepositTx() == null) {
log.warn("Missing taker deposit tx for {} {}", getClass().getSimpleName(), getId());
return true;
}
if (wallet.getBalance().equals(BigInteger.ZERO)) {
log.warn("Wallet balance is zero for {} {}", getClass().getSimpleName(), getId());
return true;
}
return false;
}
}
private void forceRestartTradeWallet() {
if (isShutDownStarted || restartInProgress) return;
log.warn("Force restarting trade wallet for {} {}", getClass().getSimpleName(), getId());
@ -2560,7 +2684,7 @@ public abstract class Trade implements Tradable, Model {
forceCloseWallet();
if (!isShutDownStarted) wallet = getWallet();
restartInProgress = false;
doPollWallet();
pollWallet();
if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId());
}

View file

@ -105,6 +105,7 @@ public class MaybeSendSignContractRequest extends TradeTask {
} catch (Exception e) {
log.warn("Error creating deposit tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, trade.getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (trade.getXmrConnectionService().isConnected()) trade.getXmrWalletService().requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}

View file

@ -71,6 +71,7 @@ public class TakerReserveTradeFunds extends TradeTask {
} catch (Exception e) {
log.warn("Error creating reserve tx, attempt={}/{}, tradeId={}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, trade.getShortId(), e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (trade.getXmrConnectionService().isConnected()) trade.getXmrWalletService().requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}

View file

@ -35,6 +35,8 @@
package haveno.core.xmr;
import com.google.inject.Inject;
import haveno.common.ThreadUtils;
import haveno.common.UserThread;
import haveno.core.api.model.XmrBalanceInfo;
import haveno.core.offer.OpenOffer;
@ -103,7 +105,7 @@ public class Balances {
updateBalances();
}
});
updateBalances();
doUpdateBalances();
}
public XmrBalanceInfo getBalances() {
@ -117,7 +119,12 @@ public class Balances {
}
private void updateBalances() {
ThreadUtils.submitToPool(() -> doUpdateBalances());
}
private void doUpdateBalances() {
synchronized (this) {
synchronized (XmrWalletService.WALLET_LOCK) {
// get wallet balances
BigInteger balance = xmrWalletService.getWallet() == null ? BigInteger.ZERO : xmrWalletService.getBalance();
@ -156,3 +163,4 @@ public class Balances {
}
}
}
}

View file

@ -24,6 +24,7 @@ import com.google.inject.name.Named;
import common.utils.JsonUtils;
import haveno.common.ThreadUtils;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.file.FileUtil;
@ -67,6 +68,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javafx.beans.property.LongProperty;
@ -155,14 +157,16 @@ public class XmrWalletService {
private TradeManager tradeManager;
private MoneroWallet wallet;
public static final Object WALLET_LOCK = new Object();
private boolean wasWalletSynced = false;
private boolean wasWalletSynced;
private final Map<String, Optional<MoneroTx>> txCache = new HashMap<String, Optional<MoneroTx>>();
private boolean isClosingWallet = false;
private boolean isShutDownStarted = false;
private boolean isClosingWallet;
private boolean isShutDownStarted;
private ExecutorService syncWalletThreadPool = Executors.newFixedThreadPool(10); // TODO: adjust based on connection type
private Long syncStartHeight = null;
private TaskLooper syncWithProgressLooper = null;
CountDownLatch syncWithProgressLatch;
private Long syncStartHeight;
private TaskLooper syncProgressLooper;
private CountDownLatch syncProgressLatch;
private Timer syncProgressTimeout;
private static final int SYNC_PROGRESS_TIMEOUT_SECONDS = 45;
// wallet polling and cache
private TaskLooper pollLooper;
@ -933,7 +937,7 @@ public class XmrWalletService {
e.printStackTrace();
// force close wallet
forceCloseWallet(wallet, getWalletPath(MONERO_WALLET_NAME));
forceCloseMainWallet();
}
log.info("Done shutting down {}", getClass().getSimpleName());
@ -1281,22 +1285,7 @@ public class XmrWalletService {
else log.info(appliedMsg);
// listen for connection changes
xmrConnectionService.addConnectionListener(connection -> {
// force restart main wallet if connection changed before synced
if (!wasWalletSynced) {
if (!Boolean.TRUE.equals(xmrConnectionService.isConnected())) return;
ThreadUtils.submitToPool(() -> {
log.warn("Force restarting main wallet because connection changed before inital sync");
forceRestartMainWallet();
});
return;
} else {
// apply connection changes
ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID);
}
});
xmrConnectionService.addConnectionListener(connection -> ThreadUtils.execute(() -> onConnectionChanged(connection), THREAD_ID));
// initialize main wallet when daemon synced
walletInitListener = (obs, oldVal, newVal) -> initMainWalletIfConnected();
@ -1305,28 +1294,17 @@ public class XmrWalletService {
}
private void initMainWalletIfConnected() {
ThreadUtils.execute(() -> {
synchronized (WALLET_LOCK) {
if (wallet == null && xmrConnectionService.downloadPercentageProperty().get() == 1 && !isShutDownStarted) {
maybeInitMainWallet(true);
if (walletInitListener != null) xmrConnectionService.downloadPercentageProperty().removeListener(walletInitListener);
}
}
}, THREAD_ID);
}
private void maybeInitMainWallet(boolean sync) {
try {
maybeInitMainWallet(sync, MAX_SYNC_ATTEMPTS);
} catch (Exception e) {
log.warn("Error initializing main wallet: " + e.getMessage());
e.printStackTrace();
HavenoUtils.havenoSetup.getTopErrorMsg().set(e.getMessage());
throw e;
}
}
private void maybeInitMainWallet(boolean sync, int numAttempts) {
ThreadUtils.execute(() -> {
synchronized (WALLET_LOCK) {
if (isShutDownStarted) return;
@ -1355,12 +1333,21 @@ public class XmrWalletService {
if (sync && numAttempts > 0) {
try {
// switch connection if disconnected
if (!wallet.isConnectedToDaemon()) {
log.warn("Switching connection before syncing with progress because disconnected");
if (requestSwitchToNextBestConnection()) return; // calls back to this method
}
// sync main wallet
log.info("Syncing main wallet");
long time = System.currentTimeMillis();
syncWithProgress(); // blocking
log.info("Done syncing main wallet in " + (System.currentTimeMillis() - time) + " ms");
// poll wallet
doPollWallet(true);
if (walletInitListener != null) xmrConnectionService.downloadPercentageProperty().removeListener(walletInitListener);
// log wallet balances
if (getMoneroNetworkType() != MoneroNetworkType.MAINNET) {
@ -1369,8 +1356,8 @@ public class XmrWalletService {
log.info("Monero wallet unlocked balance={}, pending balance={}, total balance={}", unlockedBalance, balance.subtract(unlockedBalance), balance);
}
// reapply connection after wallet synced
onConnectionChanged(xmrConnectionService.getConnection());
// reapply connection after wallet synced (might reinitialize wallet on new thread)
ThreadUtils.execute(() -> onConnectionChanged(xmrConnectionService.getConnection()), THREAD_ID);
// reset internal state if main wallet was swapped
resetIfWalletChanged();
@ -1395,12 +1382,12 @@ public class XmrWalletService {
// reschedule to init main wallet
UserThread.runAfter(() -> {
ThreadUtils.execute(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
} else {
log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000);
UserThread.runAfter(() -> {
ThreadUtils.execute(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
maybeInitMainWallet(true, numAttempts - 1);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
}
}
@ -1410,6 +1397,7 @@ public class XmrWalletService {
startPolling();
}
}
}, THREAD_ID);
}
private void resetIfWalletChanged() {
@ -1431,6 +1419,9 @@ public class XmrWalletService {
private void syncWithProgress() {
// start sync progress timeout
resetSyncProgressTimeout();
// show sync progress
updateSyncProgress(wallet.getHeight());
@ -1458,8 +1449,8 @@ public class XmrWalletService {
// poll wallet for progress
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs());
syncWithProgressLatch = new CountDownLatch(1);
syncWithProgressLooper = new TaskLooper(() -> {
syncProgressLatch = new CountDownLatch(1);
syncProgressLooper = new TaskLooper(() -> {
if (wallet == null) return;
long height = 0;
try {
@ -1470,29 +1461,22 @@ public class XmrWalletService {
}
if (height < xmrConnectionService.getTargetHeight()) updateSyncProgress(height);
else {
syncWithProgressLooper.stop();
syncProgressLooper.stop();
wasWalletSynced = true;
updateSyncProgress(height);
syncWithProgressLatch.countDown();
syncProgressLatch.countDown();
}
});
syncWithProgressLooper.start(1000);
HavenoUtils.awaitLatch(syncWithProgressLatch);
syncProgressLooper.start(1000);
HavenoUtils.awaitLatch(syncProgressLatch);
wallet.stopSyncing();
if (!wasWalletSynced) throw new IllegalStateException("Failed to sync wallet with progress");
}
private void stopSyncWithProgress() {
if (syncWithProgressLooper != null) {
syncWithProgressLooper.stop();
syncWithProgressLooper = null;
syncWithProgressLatch.countDown();
}
}
private void updateSyncProgress(long height) {
UserThread.execute(() -> {
walletHeight.set(height);
resetSyncProgressTimeout();
// new wallet reports height 1 before synced
if (height == 1) {
@ -1509,6 +1493,18 @@ public class XmrWalletService {
});
}
private synchronized void resetSyncProgressTimeout() {
if (syncProgressTimeout != null) syncProgressTimeout.stop();
syncProgressTimeout = UserThread.runAfter(() -> {
if (isShutDownStarted || wasWalletSynced) return;
log.warn("Sync progress timeout called");
forceCloseMainWallet();
requestSwitchToNextBestConnection();
maybeInitMainWallet(true);
resetSyncProgressTimeout();
}, SYNC_PROGRESS_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
private MoneroWalletFull createWalletFull(MoneroWalletConfig config) {
// must be connected to daemon
@ -1545,7 +1541,7 @@ public class XmrWalletService {
// open wallet
config.setNetworkType(getMoneroNetworkType());
config.setServer(connection);
log.info("Opening full wallet " + config.getPath() + " with monerod=" + connection.getUri());
log.info("Opening full wallet " + config.getPath() + " with monerod=" + connection.getUri() + ", proxyUri=" + connection.getProxyUri());
walletFull = MoneroWalletFull.openWallet(config);
if (walletFull.getDaemonConnection() != null) walletFull.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE);
log.info("Done opening full wallet " + config.getPath());
@ -1605,7 +1601,7 @@ public class XmrWalletService {
if (!applyProxyUri) connection.setProxyUri(null);
// open wallet
log.info("Opening RPC wallet " + config.getPath() + " with monerod=" + connection.getUri());
log.info("Opening RPC wallet " + config.getPath() + " with monerod=" + connection.getUri() + ", proxyUri=" + connection.getProxyUri());
config.setServer(connection);
walletRpc.openWallet(config);
if (walletRpc.getDaemonConnection() != null) walletRpc.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE);
@ -1667,20 +1663,37 @@ public class XmrWalletService {
String oldProxyUri = wallet == null || wallet.getDaemonConnection() == null ? null : wallet.getDaemonConnection().getProxyUri();
String newProxyUri = connection == null ? null : connection.getProxyUri();
log.info("Setting daemon connection for main wallet: uri={}, proxyUri={}", connection == null ? null : connection.getUri(), newProxyUri);
// force restart main wallet if connection changed before synced
if (!wasWalletSynced) {
if (!Boolean.TRUE.equals(xmrConnectionService.isConnected())) return;
log.warn("Force restarting main wallet because connection changed before inital sync");
forceRestartMainWallet();
return;
}
// update connection
if (wallet instanceof MoneroWalletRpc) {
if (StringUtils.equals(oldProxyUri, newProxyUri)) {
wallet.setDaemonConnection(connection);
} else {
log.info("Restarting main wallet because proxy URI has changed, old={}, new={}", oldProxyUri, newProxyUri);
log.info("Restarting main wallet because proxy URI has changed, old={}, new={}", oldProxyUri, newProxyUri); // TODO: set proxy without restarting wallet
closeMainWallet(true);
maybeInitMainWallet(false);
return; // wallet is re-initialized
}
} else {
wallet.setDaemonConnection(connection);
wallet.setProxyUri(connection.getProxyUri());
}
// sync wallet on new thread
// switch if wallet disconnected
if (Boolean.TRUE.equals(connection.isConnected() && !wallet.isConnectedToDaemon())) {
log.warn("Switching to next best connection because main wallet is disconnected");
if (requestSwitchToNextBestConnection()) return; // calls back to this method
}
// update poll period
if (connection != null && !isShutDownStarted) {
wallet.getDaemonConnection().setPrintStackTrace(PRINT_RPC_STACK_TRACE);
updatePollPeriod();
@ -1735,25 +1748,21 @@ public class XmrWalletService {
}
private void forceCloseMainWallet() {
stopPolling();
isClosingWallet = true;
forceCloseWallet(wallet, getWalletPath(MONERO_WALLET_NAME));
stopPolling();
stopSyncWithProgress();
wallet = null;
}
private void forceRestartMainWallet() {
log.warn("Force restarting main wallet");
forceCloseMainWallet();
synchronized (WALLET_LOCK) {
maybeInitMainWallet(true);
}
}
private void startPolling() {
synchronized (WALLET_LOCK) {
if (isShutDownStarted || isPollInProgress()) return;
log.info("Starting to poll main wallet");
if (isShutDownStarted || isPolling()) return;
updatePollPeriod();
pollLooper = new TaskLooper(() -> pollWallet());
pollLooper.start(pollPeriodMs);
@ -1761,13 +1770,13 @@ public class XmrWalletService {
}
private void stopPolling() {
if (isPollInProgress()) {
if (isPolling()) {
pollLooper.stop();
pollLooper = null;
}
}
private boolean isPollInProgress() {
private boolean isPolling() {
return pollLooper != null;
}
@ -1785,7 +1794,7 @@ public class XmrWalletService {
if (this.isShutDownStarted) return;
if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return;
this.pollPeriodMs = pollPeriodMs;
if (isPollInProgress()) {
if (isPolling()) {
stopPolling();
startPolling();
}
@ -1793,14 +1802,17 @@ public class XmrWalletService {
}
private void pollWallet() {
synchronized (pollLock) {
if (pollInProgress) return;
}
doPollWallet(true);
}
private void doPollWallet(boolean updateTxs) {
synchronized (pollLock) {
if (isShutDownStarted) return;
pollInProgress = true;
}
if (isShutDownStarted) return;
try {
// skip if daemon not synced
@ -1817,7 +1829,7 @@ public class XmrWalletService {
// switch to best connection if wallet is too far behind
if (wasWalletSynced && walletHeight.get() < xmrConnectionService.getTargetHeight() - NUM_BLOCKS_BEHIND_TOLERANCE && !Config.baseCurrencyNetwork().isTestnet()) {
log.warn("Updating connection because main wallet is {} blocks behind monerod, wallet height={}, monerod height={}", xmrConnectionService.getTargetHeight() - walletHeight.get(), walletHeight.get(), lastInfo.getHeight());
xmrConnectionService.switchToBestConnection();
if (xmrConnectionService.isConnected()) requestSwitchToNextBestConnection();
}
// sync wallet if behind daemon
@ -1856,6 +1868,7 @@ public class XmrWalletService {
} finally {
// cache wallet info last
synchronized (WALLET_LOCK) {
if (wallet != null && !isShutDownStarted) {
try {
cacheWalletInfo();
@ -1863,6 +1876,9 @@ public class XmrWalletService {
e.printStackTrace();
}
}
}
synchronized (pollLock) {
pollInProgress = false;
}
}
@ -1887,6 +1903,10 @@ public class XmrWalletService {
}
}
public boolean requestSwitchToNextBestConnection() {
return xmrConnectionService.requestSwitchToNextBestConnection();
}
private void onNewBlock(long height) {
UserThread.execute(() -> {
walletHeight.set(height);

View file

@ -270,6 +270,7 @@ public class WithdrawalView extends ActivatableView<VBox, Void> {
if (isNotEnoughMoney(e.getMessage())) throw e;
log.warn("Error creating creating withdraw tx, attempt={}/{}, error={}", i + 1, TradeProtocol.MAX_ATTEMPTS, e.getMessage());
if (i == TradeProtocol.MAX_ATTEMPTS - 1) throw e;
if (xmrWalletService.getConnectionService().isConnected()) xmrWalletService.requestSwitchToNextBestConnection();
HavenoUtils.waitFor(TradeProtocol.REPROCESS_DELAY_MS); // wait before retrying
}
}