refactor wallet poll loops to further minimize requests

This commit is contained in:
woodser 2024-04-21 13:11:24 -04:00
parent 5c0d9a1ae5
commit 9d9635ff50
7 changed files with 140 additions and 139 deletions

View file

@ -268,7 +268,7 @@ public final class XmrConnectionService {
} }
public Long getTargetHeight() { public Long getTargetHeight() {
if (daemon == null || lastInfo == null) return null; if (lastInfo == null) return null;
return lastInfo.getTargetHeight() == 0 ? chainHeight.get() : lastInfo.getTargetHeight(); // monerod sync_info's target_height returns 0 when node is fully synced return lastInfo.getTargetHeight() == 0 ? chainHeight.get() : lastInfo.getTargetHeight(); // monerod sync_info's target_height returns 0 when node is fully synced
} }

View file

@ -195,7 +195,7 @@ public abstract class SupportManager {
for (Dispute dispute : trade.getDisputes()) { for (Dispute dispute : trade.getDisputes()) {
for (ChatMessage chatMessage : dispute.getChatMessages()) { for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) { if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
if (dispute.isClosed()) trade.syncWalletNormallyForMs(30000); // sync to check for payout if (dispute.isClosed()) trade.pollWalletNormallyForMs(30000); // sync to check for payout
else trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); else trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
} }
} }

View file

@ -792,7 +792,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// the state, as that is displayed to the user and we only persist that msg // the state, as that is displayed to the user and we only persist that msg
disputeResult.getChatMessage().setArrived(true); disputeResult.getChatMessage().setArrived(true);
trade.advanceDisputeState(Trade.DisputeState.ARBITRATOR_SAW_ARRIVED_DISPUTE_CLOSED_MSG); trade.advanceDisputeState(Trade.DisputeState.ARBITRATOR_SAW_ARRIVED_DISPUTE_CLOSED_MSG);
trade.syncWalletNormallyForMs(30000); trade.pollWalletNormallyForMs(30000);
requestPersistence(); requestPersistence();
resultHandler.handleResult(); resultHandler.handleResult();
} }

View file

@ -40,7 +40,6 @@ import com.google.protobuf.Message;
import common.utils.GenUtils; import common.utils.GenUtils;
import haveno.common.ThreadUtils; import haveno.common.ThreadUtils;
import haveno.common.UserThread; import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.crypto.Encryption; import haveno.common.crypto.Encryption;
import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.PubKeyRing;
import haveno.common.proto.ProtoUtil; import haveno.common.proto.ProtoUtil;
@ -91,7 +90,6 @@ import monero.common.MoneroError;
import monero.common.MoneroRpcConnection; import monero.common.MoneroRpcConnection;
import monero.common.TaskLooper; import monero.common.TaskLooper;
import monero.daemon.MoneroDaemon; import monero.daemon.MoneroDaemon;
import monero.daemon.model.MoneroDaemonInfo;
import monero.daemon.model.MoneroKeyImage; import monero.daemon.model.MoneroKeyImage;
import monero.daemon.model.MoneroTx; import monero.daemon.model.MoneroTx;
import monero.wallet.MoneroWallet; import monero.wallet.MoneroWallet;
@ -137,6 +135,7 @@ public abstract class Trade implements Tradable, Model {
private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_"; private static final String MONERO_TRADE_WALLET_PREFIX = "xmr_trade_";
private static final long SHUTDOWN_TIMEOUT_MS = 60000; private static final long SHUTDOWN_TIMEOUT_MS = 60000;
private static final long DELETE_BACKUPS_AFTER_NUM_BLOCKS = 3600; // ~5 days private static final long DELETE_BACKUPS_AFTER_NUM_BLOCKS = 3600; // ~5 days
private static final long SYNC_EVERY_NUM_BLOCKS = 360; // ~1/2 day
private final Object walletLock = new Object(); private final Object walletLock = new Object();
private final Object pollLock = new Object(); private final Object pollLock = new Object();
private MoneroWallet wallet; private MoneroWallet wallet;
@ -414,8 +413,8 @@ public abstract class Trade implements Tradable, Model {
transient private Subscription tradePhaseSubscription; transient private Subscription tradePhaseSubscription;
transient private Subscription payoutStateSubscription; transient private Subscription payoutStateSubscription;
transient private TaskLooper pollLooper; transient private TaskLooper pollLooper;
transient private Long walletRefreshPeriodMs; transient private Long pollPeriodMs;
transient private Long syncNormalStartTimeMs; transient private Long pollNormalStartTimeMs;
public static final long DEFER_PUBLISH_MS = 25000; // 25 seconds public static final long DEFER_PUBLISH_MS = 25000; // 25 seconds
private static final long IDLE_SYNC_PERIOD_MS = 1680000; // 28 minutes (monero's default connection timeout is 30 minutes on a local connection, so beyond this the wallets will disconnect) private static final long IDLE_SYNC_PERIOD_MS = 1680000; // 28 minutes (monero's default connection timeout is 30 minutes on a local connection, so beyond this the wallets will disconnect)
@ -638,7 +637,7 @@ public abstract class Trade implements Tradable, Model {
if (!isInitialized || isShutDownStarted) return; if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (newValue == Trade.State.MULTISIG_COMPLETED) { if (newValue == Trade.State.MULTISIG_COMPLETED) {
updateWalletRefreshPeriod(); updatePollPeriod();
startPolling(); startPolling();
} }
}, getId()); }, getId());
@ -648,7 +647,7 @@ public abstract class Trade implements Tradable, Model {
tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return; if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); if (isDepositsPublished() && !isPayoutUnlocked()) updatePollPeriod();
if (isPaymentReceived()) { if (isPaymentReceived()) {
UserThread.execute(() -> { UserThread.execute(() -> {
if (tradePhaseSubscription != null) { if (tradePhaseSubscription != null) {
@ -664,7 +663,7 @@ public abstract class Trade implements Tradable, Model {
payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> {
if (!isInitialized || isShutDownStarted) return; if (!isInitialized || isShutDownStarted) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (isPayoutPublished()) updateWalletRefreshPeriod(); if (isPayoutPublished()) updatePollPeriod();
// handle when payout published // handle when payout published
if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) { if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) {
@ -724,7 +723,7 @@ public abstract class Trade implements Tradable, Model {
if (walletExists()) getWallet(); if (walletExists()) getWallet();
else { else {
MoneroTx payoutTx = getPayoutTx(); MoneroTx payoutTx = getPayoutTx();
if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) { if (payoutTx != null && payoutTx.getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK) {
log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState()); log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState());
setPayoutStateUnlocked(); setPayoutStateUnlocked();
return; return;
@ -734,7 +733,7 @@ public abstract class Trade implements Tradable, Model {
} }
// initialize syncing and polling // initialize syncing and polling
tryInitSyncing(); tryInitPolling();
} }
public void requestPersistence() { public void requestPersistence() {
@ -820,7 +819,7 @@ public abstract class Trade implements Tradable, Model {
} }
public boolean isIdling() { public boolean isIdling() {
return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && syncNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && pollNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden
} }
public boolean isSyncedWithinTolerance() { public boolean isSyncedWithinTolerance() {
@ -838,40 +837,38 @@ public abstract class Trade implements Tradable, Model {
syncWallet(true); syncWallet(true);
} }
public void syncWalletNormallyForMs(long syncNormalDuration) { public void pollWalletNormallyForMs(long pollNormalDuration) {
syncNormalStartTimeMs = System.currentTimeMillis(); pollNormalStartTimeMs = System.currentTimeMillis();
// override wallet refresh period // override wallet poll period
setWalletRefreshPeriod(xmrConnectionService.getRefreshPeriodMs()); setPollPeriod(xmrConnectionService.getRefreshPeriodMs());
// reset wallet refresh period after duration // reset wallet poll period after duration
new Thread(() -> { new Thread(() -> {
GenUtils.waitFor(syncNormalDuration); GenUtils.waitFor(pollNormalDuration);
Long syncNormalStartTimeMsCopy = syncNormalStartTimeMs; // copy to avoid race condition Long pollNormalStartTimeMsCopy = pollNormalStartTimeMs; // copy to avoid race condition
if (syncNormalStartTimeMsCopy == null) return; if (pollNormalStartTimeMsCopy == null) return;
if (!isShutDown && System.currentTimeMillis() >= syncNormalStartTimeMsCopy + syncNormalDuration) { if (!isShutDown && System.currentTimeMillis() >= pollNormalStartTimeMsCopy + pollNormalDuration) {
syncNormalStartTimeMs = null; pollNormalStartTimeMs = null;
updateWalletRefreshPeriod(); updatePollPeriod();
} }
}).start(); }).start();
// TODO: sync wallet because `auto_refresh` will not sync wallet until end of last sync period (which could be a long idle)
new Thread(() -> {
GenUtils.waitFor(1000);
if (!isShutDownStarted) trySyncWallet(true);
}).start();
} }
public void importMultisigHex() { public void importMultisigHex() {
synchronized (walletLock) { synchronized (walletLock) {
// ensure wallet sees deposits confirmed
if (!isDepositsConfirmed()) syncAndPollWallet();
// import multisig hexes
List<String> multisigHexes = new ArrayList<String>(); List<String> multisigHexes = new ArrayList<String>();
if (getBuyer().getUpdatedMultisigHex() != null) multisigHexes.add(getBuyer().getUpdatedMultisigHex()); for (TradePeer node : getAllTradeParties()) if (node.getUpdatedMultisigHex() != null) multisigHexes.add(node.getUpdatedMultisigHex());
if (getSeller().getUpdatedMultisigHex() != null) multisigHexes.add(getSeller().getUpdatedMultisigHex());
if (getArbitrator().getUpdatedMultisigHex() != null) multisigHexes.add(getArbitrator().getUpdatedMultisigHex());
if (!multisigHexes.isEmpty()) { if (!multisigHexes.isEmpty()) {
log.info("Importing multisig hex for {} {}", getClass().getSimpleName(), getId()); log.info("Importing multisig hex for {} {}", getClass().getSimpleName(), getId());
long startTime = System.currentTimeMillis();
getWallet().importMultisigHex(multisigHexes.toArray(new String[0])); getWallet().importMultisigHex(multisigHexes.toArray(new String[0]));
log.info("Done importing multisig hex for {} {}", getClass().getSimpleName(), getId()); log.info("Done importing multisig hex for {} {} in {} ms", getClass().getSimpleName(), getId(), System.currentTimeMillis() - startTime);
} }
requestSaveWallet(); requestSaveWallet();
} }
@ -917,7 +914,7 @@ public abstract class Trade implements Tradable, Model {
stopPolling(); stopPolling();
xmrWalletService.closeWallet(wallet, true); xmrWalletService.closeWallet(wallet, true);
wallet = null; wallet = null;
walletRefreshPeriodMs = null; pollPeriodMs = null;
} }
} }
@ -2014,26 +2011,25 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread // sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
ThreadUtils.execute(() -> tryInitSyncing(), getId()); ThreadUtils.execute(() -> tryInitPolling(), getId());
} }
} }
} }
private void tryInitPolling() {
private void tryInitSyncing() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
if (!isIdling()) { if (!isIdling()) {
initSyncingAux(); tryInitPollingAux();
} else { } else {
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getWalletRefreshPeriod()); // random time to start syncing long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling
UserThread.runAfter(() -> ThreadUtils.execute(() -> { UserThread.runAfter(() -> ThreadUtils.execute(() -> {
if (!isShutDownStarted) initSyncingAux(); if (!isShutDownStarted) tryInitPollingAux();
}, getId()), startSyncingInMs / 1000l); }, getId()), startSyncingInMs / 1000l);
} }
} }
private void initSyncingAux() { private void tryInitPollingAux() {
if (!wasWalletSynced) trySyncWallet(false); if (!wasWalletSynced) trySyncWallet(false);
updateWalletRefreshPeriod(); updatePollPeriod();
// reprocess pending payout messages // reprocess pending payout messages
this.getProtocol().maybeReprocessPaymentReceivedMessage(false); this.getProtocol().maybeReprocessPaymentReceivedMessage(false);
@ -2070,20 +2066,16 @@ public abstract class Trade implements Tradable, Model {
if (pollWallet) pollWallet(); if (pollWallet) pollWallet();
} }
public void updateWalletRefreshPeriod() { public void updatePollPeriod() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
setWalletRefreshPeriod(getWalletRefreshPeriod()); setPollPeriod(getPollPeriod());
} }
private void setWalletRefreshPeriod(long walletRefreshPeriodMs) { private void setPollPeriod(long pollPeriodMs) {
synchronized (walletLock) { synchronized (walletLock) {
if (this.isShutDownStarted) return; if (this.isShutDownStarted) return;
if (this.walletRefreshPeriodMs != null && this.walletRefreshPeriodMs == walletRefreshPeriodMs) return; if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return;
this.walletRefreshPeriodMs = walletRefreshPeriodMs; this.pollPeriodMs = pollPeriodMs;
if (getWallet() != null) {
log.info("Setting wallet refresh rate for {} {} to {}", getClass().getSimpleName(), getId(), getWalletRefreshPeriod());
getWallet().startSyncing(getWalletRefreshPeriod()); // TODO (monero-project): wallet rpc waits until last sync period finishes before starting new sync period
}
if (isPollInProgress()) { if (isPollInProgress()) {
stopPolling(); stopPolling();
startPolling(); startPolling();
@ -2091,12 +2083,17 @@ public abstract class Trade implements Tradable, Model {
} }
} }
private long getPollPeriod() {
if (isIdling()) return IDLE_SYNC_PERIOD_MS;
return xmrConnectionService.getRefreshPeriodMs();
}
private void startPolling() { private void startPolling() {
synchronized (walletLock) { synchronized (walletLock) {
if (isShutDownStarted || isPollInProgress()) return; if (isShutDownStarted || isPollInProgress()) return;
log.info("Starting to poll wallet for {} {}", getClass().getSimpleName(), getId()); log.info("Starting to poll wallet for {} {}", getClass().getSimpleName(), getId());
pollLooper = new TaskLooper(() -> pollWallet()); pollLooper = new TaskLooper(() -> pollWallet());
pollLooper.start(walletRefreshPeriodMs); pollLooper.start(pollPeriodMs);
} }
} }
@ -2121,58 +2118,29 @@ public abstract class Trade implements Tradable, Model {
pollInProgress = true; pollInProgress = true;
try { try {
// log warning if wallet is too far behind daemon // skip if payout unlocked
MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo(); if (isPayoutUnlocked()) return;
long walletHeight = wallet.getHeight();
int maxBlocksBehindWarning = 10;
if (wasWalletSynced && isDepositsPublished() && !isIdling() && lastInfo != null && walletHeight < lastInfo.getHeight() - maxBlocksBehindWarning && !Config.baseCurrencyNetwork().isTestnet()) {
log.warn("Wallet is more than {} blocks behind monerod for {} {}, wallet height={}, monerod height={},", maxBlocksBehindWarning, getClass().getSimpleName(), getShortId(), walletHeight, lastInfo.getHeight());
}
// skip if either deposit tx id is unknown // skip if either deposit tx id is unknown
if (processModel.getMaker().getDepositTxHash() == null || processModel.getTaker().getDepositTxHash() == null) return; if (processModel.getMaker().getDepositTxHash() == null || processModel.getTaker().getDepositTxHash() == null) return;
// skip if payout unlocked // sync if wallet too far behind daemon
if (isPayoutUnlocked()) return; if (xmrConnectionService.getTargetHeight() == null) return;
if (wallet.getHeight() < xmrConnectionService.getTargetHeight() - SYNC_EVERY_NUM_BLOCKS) syncWallet(false);
// rescan spent outputs to detect unconfirmed payout tx after payment received message // update deposit txs
if (!isPayoutPublished() && (hasPaymentReceivedMessage() || hasDisputeClosedMessage())) {
try {
wallet.rescanSpent();
} catch (Exception e) {
log.warn("Error rescanning spent outputs to detect payout tx for {} {}, errorMessage={}", getClass().getSimpleName(), getShortId(), e.getMessage());
}
}
// get txs from trade wallet
boolean payoutExpected = isPaymentReceived() || getSeller().getPaymentReceivedMessage() != null || disputeState.ordinal() >= DisputeState.ARBITRATOR_SENT_DISPUTE_CLOSED_MSG.ordinal() || getArbitrator().getDisputeClosedMessage() != null;
boolean checkPool = !isDepositsConfirmed() || (!isPayoutConfirmed() && payoutExpected);
MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true);
if (!checkPool) query.setInTxPool(false); // avoid pool check if possible
List<MoneroTxWallet> txs = wallet.getTxs(query);
// warn on double spend // TODO: other handling?
for (MoneroTxWallet tx : txs) {
if (Boolean.TRUE.equals(tx.isDoubleSpendSeen())) log.warn("Double spend seen for tx {} for {} {}", tx.getHash(), getClass().getSimpleName(), getShortId());
}
// check deposit txs
if (!isDepositsUnlocked()) { if (!isDepositsUnlocked()) {
// update trader txs // sync wallet if behind
MoneroTxWallet makerDepositTx = null; syncWalletIfBehind();
MoneroTxWallet takerDepositTx = null;
for (MoneroTxWallet tx : txs) {
if (tx.getHash().equals(processModel.getMaker().getDepositTxHash())) makerDepositTx = tx;
if (tx.getHash().equals(processModel.getTaker().getDepositTxHash())) takerDepositTx = tx;
}
if (makerDepositTx != null) getMaker().setDepositTx(makerDepositTx);
if (takerDepositTx != null) getTaker().setDepositTx(takerDepositTx);
// skip if deposit txs not seen // get txs from trade wallet
if (makerDepositTx == null || takerDepositTx == null) return; List<MoneroTxWallet> txs = wallet.getTxs(new MoneroTxQuery().setIncludeOutputs(true).setInTxPool(false)); // TODO (monero-wallet-rpc): cannot get pool txs without re-refetching from pool
setDepositTxs(txs);
if (txs.size() != 2) return; // skip if either tx not seen
setStateDepositsSeen();
// set security deposits // set actual security deposits
if (getBuyer().getSecurityDeposit().longValueExact() == 0) { if (getBuyer().getSecurityDeposit().longValueExact() == 0) {
BigInteger buyerSecurityDeposit = ((MoneroTxWallet) getBuyer().getDepositTx()).getIncomingAmount(); BigInteger buyerSecurityDeposit = ((MoneroTxWallet) getBuyer().getDepositTx()).getIncomingAmount();
BigInteger sellerSecurityDeposit = ((MoneroTxWallet) getSeller().getDepositTx()).getIncomingAmount().subtract(getAmount()); BigInteger sellerSecurityDeposit = ((MoneroTxWallet) getSeller().getDepositTx()).getIncomingAmount().subtract(getAmount());
@ -2180,21 +2148,44 @@ public abstract class Trade implements Tradable, Model {
getSeller().setSecurityDeposit(sellerSecurityDeposit); getSeller().setSecurityDeposit(sellerSecurityDeposit);
} }
// update state // check for deposit txs confirmation
setStateDepositsPublished(); if (getMaker().getDepositTx().isConfirmed() && getTaker().getDepositTx().isConfirmed()) setStateDepositsConfirmed();
if (makerDepositTx.isConfirmed() && takerDepositTx.isConfirmed()) setStateDepositsConfirmed();
if (!makerDepositTx.isLocked() && !takerDepositTx.isLocked()) setStateDepositsUnlocked(); // check for deposit txs unlocked
if (getMaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK && getTaker().getDepositTx().getNumConfirmations() >= XmrWalletService.NUM_BLOCKS_UNLOCK) {
setStateDepositsUnlocked();
}
} }
// check payout tx // check for payout tx
if (isDepositsUnlocked()) { if (isDepositsUnlocked()) {
// determine if payout tx expected
boolean isPayoutExpected = isPaymentReceived() || hasPaymentReceivedMessage() || hasDisputeClosedMessage() || disputeState.ordinal() >= DisputeState.ARBITRATOR_SENT_DISPUTE_CLOSED_MSG.ordinal();
// sync wallet if payout expected or payout is published
if (isPayoutExpected || isPayoutPublished()) syncWalletIfBehind();
// rescan spent outputs to detect unconfirmed payout tx
if (isPayoutExpected && !isPayoutPublished()) {
try {
wallet.rescanSpent();
} catch (Exception e) {
log.warn("Error rescanning spent outputs to detect payout tx for {} {}, errorMessage={}", getClass().getSimpleName(), getShortId(), e.getMessage());
}
}
// get txs from trade wallet
boolean checkPool = isPayoutExpected && !isPayoutConfirmed();
MoneroTxQuery query = new MoneroTxQuery().setIncludeOutputs(true);
if (!checkPool) query.setInTxPool(false); // avoid pool check if possible
List<MoneroTxWallet> txs = wallet.getTxs(query);
setDepositTxs(txs);
// check if any outputs spent (observed on payout published) // check if any outputs spent (observed on payout published)
for (MoneroTxWallet tx : txs) { for (MoneroTxWallet tx : txs) {
for (MoneroOutputWallet output : tx.getOutputsWallet()) { for (MoneroOutputWallet output : tx.getOutputsWallet()) {
if (Boolean.TRUE.equals(output.isSpent())) { if (Boolean.TRUE.equals(output.isSpent())) setPayoutStatePublished();
setPayoutStatePublished();
}
} }
} }
@ -2224,6 +2215,17 @@ public abstract class Trade implements Tradable, Model {
} }
} }
private void syncWalletIfBehind() {
if (wallet.getHeight() < xmrConnectionService.getTargetHeight()) syncWallet(false);
}
private void setDepositTxs(List<? extends MoneroTx> txs) {
for (MoneroTx tx : txs) {
if (tx.getHash().equals(getMaker().getDepositTxHash())) getMaker().setDepositTx(tx);
if (tx.getHash().equals(getTaker().getDepositTxHash())) getTaker().setDepositTx(tx);
}
}
private void forceRestartTradeWallet() { private void forceRestartTradeWallet() {
log.warn("Force restarting trade wallet for {} {}", getClass().getSimpleName(), getId()); log.warn("Force restarting trade wallet for {} {}", getClass().getSimpleName(), getId());
if (isShutDownStarted || restartInProgress) return; if (isShutDownStarted || restartInProgress) return;
@ -2231,15 +2233,10 @@ public abstract class Trade implements Tradable, Model {
forceCloseWallet(); forceCloseWallet();
if (!isShutDownStarted) wallet = getWallet(); if (!isShutDownStarted) wallet = getWallet();
restartInProgress = false; restartInProgress = false;
if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitSyncing(), getId()); if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId());
} }
private long getWalletRefreshPeriod() { private void setStateDepositsSeen() {
if (isIdling()) return IDLE_SYNC_PERIOD_MS;
return xmrConnectionService.getRefreshPeriodMs();
}
private void setStateDepositsPublished() {
if (!isDepositsPublished()) setState(State.DEPOSIT_TXS_SEEN_IN_NETWORK); if (!isDepositsPublished()) setState(State.DEPOSIT_TXS_SEEN_IN_NETWORK);
} }

View file

@ -458,8 +458,8 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}, trade.getId()); }, trade.getId());
} }
public void handle(DepositsConfirmedMessage response, NodeAddress sender) { public void handle(DepositsConfirmedMessage message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage) from " + sender);
if (!trade.isInitialized() || trade.isShutDown()) return; if (!trade.isInitialized() || trade.isShutDown()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
synchronized (trade) { synchronized (trade) {
@ -467,7 +467,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
latchTrade(); latchTrade();
this.errorMessageHandler = null; this.errorMessageHandler = null;
expect(new Condition(trade) expect(new Condition(trade)
.with(response) .with(message)
.from(sender)) .from(sender))
.setup(tasks( .setup(tasks(
ProcessDepositsConfirmedMessage.class, ProcessDepositsConfirmedMessage.class,
@ -475,10 +475,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
MaybeResendDisputeClosedMessageWithPayout.class) MaybeResendDisputeClosedMessageWithPayout.class)
.using(new TradeTaskRunner(trade, .using(new TradeTaskRunner(trade,
() -> { () -> {
handleTaskRunnerSuccess(sender, response); handleTaskRunnerSuccess(sender, message);
}, },
errorMessage -> { errorMessage -> {
handleTaskRunnerFault(sender, response, errorMessage); handleTaskRunnerFault(sender, message, errorMessage);
}))) })))
.executeTasks(); .executeTasks();
awaitTradeLatch(); awaitTradeLatch();

View file

@ -132,6 +132,7 @@ public class XmrWalletService {
private static final boolean PRINT_STACK_TRACE = false; private static final boolean PRINT_STACK_TRACE = false;
private static final String THREAD_ID = XmrWalletService.class.getSimpleName(); private static final String THREAD_ID = XmrWalletService.class.getSimpleName();
private static final long SHUTDOWN_TIMEOUT_MS = 60000; private static final long SHUTDOWN_TIMEOUT_MS = 60000;
private static final long NUM_BLOCKS_BEHIND_WARNING = 10;
private final User user; private final User user;
private final Preferences preferences; private final Preferences preferences;
@ -165,7 +166,7 @@ public class XmrWalletService {
// wallet polling and cache // wallet polling and cache
private TaskLooper pollLooper; private TaskLooper pollLooper;
private boolean pollInProgress; private boolean pollInProgress;
private Long walletRefreshPeriodMs; private Long pollPeriodMs;
private final Object pollLock = new Object(); private final Object pollLock = new Object();
private Long cachedHeight; private Long cachedHeight;
private BigInteger cachedBalance; private BigInteger cachedBalance;
@ -1400,6 +1401,7 @@ public class XmrWalletService {
}); });
syncWithProgressLooper.start(1000); syncWithProgressLooper.start(1000);
HavenoUtils.awaitLatch(syncWithProgressLatch); HavenoUtils.awaitLatch(syncWithProgressLatch);
wallet.stopSyncing();
if (!wasWalletSynced) throw new IllegalStateException("Failed to sync wallet with progress"); if (!wasWalletSynced) throw new IllegalStateException("Failed to sync wallet with progress");
} }
@ -1441,7 +1443,7 @@ public class XmrWalletService {
try { try {
// create wallet // create wallet
log.info("Creating full wallet " + config.getPath() + " connected to daemon " + connection.getUri()); log.info("Creating full wallet " + config.getPath() + " connected to monerod=" + connection.getUri());
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
config.setServer(connection); config.setServer(connection);
walletFull = MoneroWalletFull.createWallet(config); walletFull = MoneroWalletFull.createWallet(config);
@ -1495,7 +1497,7 @@ public class XmrWalletService {
walletRpc.stopSyncing(); walletRpc.stopSyncing();
// create wallet // create wallet
log.info("Creating RPC wallet " + config.getPath() + " connected to daemon " + connection.getUri()); log.info("Creating RPC wallet " + config.getPath() + " connected to monerod=" + connection.getUri());
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
config.setServer(connection); config.setServer(connection);
walletRpc.createWallet(config); walletRpc.createWallet(config);
@ -1602,9 +1604,8 @@ public class XmrWalletService {
// sync wallet on new thread // sync wallet on new thread
if (connection != null && !isShutDownStarted) { if (connection != null && !isShutDownStarted) {
updateWalletRefreshPeriod();
wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE); wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE);
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs()); updatePollPeriod();
} }
log.info("Done setting main wallet monerod=" + (wallet.getDaemonConnection() == null ? null : wallet.getDaemonConnection().getUri())); log.info("Done setting main wallet monerod=" + (wallet.getDaemonConnection() == null ? null : wallet.getDaemonConnection().getUri()));
@ -1675,9 +1676,9 @@ public class XmrWalletService {
synchronized (walletLock) { synchronized (walletLock) {
if (isShutDownStarted || isPollInProgress()) return; if (isShutDownStarted || isPollInProgress()) return;
log.info("Starting to poll main wallet"); log.info("Starting to poll main wallet");
updateWalletRefreshPeriod(); updatePollPeriod();
pollLooper = new TaskLooper(() -> pollWallet()); pollLooper = new TaskLooper(() -> pollWallet());
pollLooper.start(walletRefreshPeriodMs); pollLooper.start(pollPeriodMs);
} }
} }
@ -1692,24 +1693,20 @@ public class XmrWalletService {
return pollLooper != null; return pollLooper != null;
} }
public void updateWalletRefreshPeriod() { public void updatePollPeriod() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
setWalletRefreshPeriod(getWalletRefreshPeriod()); setPollPeriod(getPollPeriod());
} }
private long getWalletRefreshPeriod() { private long getPollPeriod() {
return xmrConnectionService.getRefreshPeriodMs(); return xmrConnectionService.getRefreshPeriodMs();
} }
private void setWalletRefreshPeriod(long walletRefreshPeriodMs) { private void setPollPeriod(long pollPeriodMs) {
synchronized (walletLock) { synchronized (walletLock) {
if (this.isShutDownStarted) return; if (this.isShutDownStarted) return;
if (this.walletRefreshPeriodMs != null && this.walletRefreshPeriodMs == walletRefreshPeriodMs) return; if (this.pollPeriodMs != null && this.pollPeriodMs == pollPeriodMs) return;
this.walletRefreshPeriodMs = walletRefreshPeriodMs; this.pollPeriodMs = pollPeriodMs;
if (getWallet() != null) {
log.info("Setting main wallet refresh rate for to {}", getWalletRefreshPeriod());
getWallet().startSyncing(getWalletRefreshPeriod()); // TODO (monero-project): wallet rpc waits until last sync period finishes before starting new sync period
}
if (isPollInProgress()) { if (isPollInProgress()) {
stopPolling(); stopPolling();
startPolling(); startPolling();
@ -1718,24 +1715,31 @@ public class XmrWalletService {
} }
private void pollWallet() { private void pollWallet() {
if (pollInProgress) return;
doPollWallet(true); doPollWallet(true);
} }
private void doPollWallet(boolean updateTxs) { private void doPollWallet(boolean updateTxs) {
if (pollInProgress) return;
synchronized (pollLock) { synchronized (pollLock) {
pollInProgress = true; pollInProgress = true;
try { try {
// log warning if wallet is too far behind daemon // log warning if wallet is too far behind daemon
MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo(); MoneroDaemonInfo lastInfo = xmrConnectionService.getLastInfo();
if (lastInfo == null) {
log.warn("Last daemon info is null");
return;
}
long walletHeight = wallet.getHeight(); long walletHeight = wallet.getHeight();
int maxBlocksBehindWarning = 10; if (wasWalletSynced && walletHeight < xmrConnectionService.getTargetHeight() - NUM_BLOCKS_BEHIND_WARNING && !Config.baseCurrencyNetwork().isTestnet()) {
if (wasWalletSynced && lastInfo != null && walletHeight < lastInfo.getHeight() - maxBlocksBehindWarning && !Config.baseCurrencyNetwork().isTestnet()) { log.warn("Main wallet is {} blocks behind monerod, wallet height={}, monerod height={},", xmrConnectionService.getTargetHeight() - walletHeight, walletHeight, lastInfo.getHeight());
log.warn("Main wallet is more than {} blocks behind monerod, wallet height={}, monerod height={},", maxBlocksBehindWarning, walletHeight, lastInfo.getHeight());
} }
// fetch transactions from pool and cache // sync wallet if behind daemon
if (wallet.getHeight() < xmrConnectionService.getTargetHeight()) wallet.sync();
// fetch transactions from pool and store to cache
// TODO: ideally wallet should sync every poll and then avoid checking pool on fetching txs
if (updateTxs) { if (updateTxs) {
try { try {
cachedTxs = wallet.getTxs(new MoneroTxQuery().setIncludeOutputs(true)); cachedTxs = wallet.getTxs(new MoneroTxQuery().setIncludeOutputs(true));

View file

@ -131,7 +131,7 @@ public class TxIdTextField extends AnchorPane {
txUpdater = new MoneroWalletListener() { txUpdater = new MoneroWalletListener() {
@Override @Override
public void onNewBlock(long lastBlockHeight) { public void onNewBlock(long lastBlockHeight) {
updateConfidence(txId, false, lastBlockHeight + 1); updateConfidence(txId, false, lastBlockHeight);
} }
}; };
xmrWalletService.addWalletListener(txUpdater); xmrWalletService.addWalletListener(txUpdater);