move processing off UserThread for smoother experience

This commit is contained in:
woodser 2023-12-17 09:38:30 -05:00
parent ba9a9a3dcc
commit e6775f3b58
16 changed files with 276 additions and 215 deletions

View file

@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -59,6 +60,19 @@ public class UserThread {
UserThread.executor.execute(command);
}
public static void await(Runnable command) {
CountDownLatch latch = new CountDownLatch(1);
executor.execute(() -> {
command.run();
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Prefer FxTimer if a delay is needed in a JavaFx class (gui module)
public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) {
return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS);

View file

@ -20,6 +20,7 @@ import javafx.beans.property.LongProperty;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyDoubleProperty;
import javafx.beans.property.ReadOnlyIntegerProperty;
import javafx.beans.property.ReadOnlyLongProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleLongProperty;
@ -61,9 +62,11 @@ public final class XmrConnectionService {
private final MoneroConnectionManager connectionManager;
private final EncryptedConnectionList connectionList;
private final ObjectProperty<List<MoneroPeer>> peers = new SimpleObjectProperty<>();
private final ObjectProperty<MoneroRpcConnection> connectionProperty = new SimpleObjectProperty<>();
private final IntegerProperty numPeers = new SimpleIntegerProperty(0);
private final LongProperty chainHeight = new SimpleLongProperty(0);
private final DownloadListener downloadListener = new DownloadListener();
private final LongProperty numUpdates = new SimpleLongProperty(0);
private Socks5ProxyProvider socks5ProxyProvider;
private boolean isInitialized;
@ -286,6 +289,10 @@ public final class XmrConnectionService {
return peers;
}
public ReadOnlyObjectProperty<MoneroRpcConnection> connectionProperty() {
return connectionProperty;
}
public boolean hasSufficientPeersForBroadcast() {
return numPeers.get() >= getMinBroadcastConnections();
}
@ -306,6 +313,10 @@ public final class XmrConnectionService {
return downloadPercentageProperty().get() == 1d;
}
public ReadOnlyLongProperty numUpdatesProperty() {
return numUpdates;
}
// ------------------------------- HELPERS --------------------------------
private void doneDownload() {
@ -517,6 +528,12 @@ public final class XmrConnectionService {
connectionList.addConnection(currentConnection);
connectionList.setCurrentConnectionUri(currentConnection.getUri());
}
// set connection property on user thread
UserThread.execute(() -> {
connectionProperty.set(currentConnection);
numUpdates.set(numUpdates.get() + 1);
});
}
updatePolling();
@ -564,31 +581,38 @@ public final class XmrConnectionService {
if (daemon == null) throw new RuntimeException("No daemon connection");
lastInfo = daemon.getInfo();
// set chain height
chainHeight.set(lastInfo.getHeight());
// update properties on user thread
UserThread.execute(() -> {
// update sync progress
boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL;
if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections"
else if (lastInfo.isBusySyncing()) {
long targetHeight = lastInfo.getTargetHeight();
long blocksLeft = targetHeight - lastInfo.getHeight();
if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
}
// set chain height
chainHeight.set(lastInfo.getHeight());
// update sync progress
boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL;
if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections"
else if (lastInfo.isBusySyncing()) {
long targetHeight = lastInfo.getTargetHeight();
long blocksLeft = targetHeight - lastInfo.getHeight();
if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
}
// set peer connections
// TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections
// try {
// peers.set(getOnlinePeers());
// } catch (Exception err) {
// // TODO: peers unknown due to restricted RPC call
// }
// numPeers.set(peers.get().size());
numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections());
peers.set(new ArrayList<MoneroPeer>());
// notify update
numUpdates.set(numUpdates.get() + 1);
});
// set peer connections
// TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections
// try {
// peers.set(getOnlinePeers());
// } catch (Exception err) {
// // TODO: peers unknown due to restricted RPC call
// }
// numPeers.set(peers.get().size());
numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections());
peers.set(new ArrayList<MoneroPeer>());
// handle error recovery
if (lastErrorTimestamp != null) {
log.info("Successfully fetched daemon info after previous error");

View file

@ -42,11 +42,13 @@ import haveno.core.setup.CorePersistedDataHost;
import haveno.core.setup.CoreSetup;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
import haveno.core.xmr.wallet.BtcWalletService;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.Connection;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -337,7 +339,12 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven
Set<Runnable> tasks = new HashSet<Runnable>();
tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
HavenoUtils.executeTasks(tasks); // notify in parallel
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}
injector.getInstance(PriceFeedService.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
@ -357,6 +364,10 @@ public abstract class HavenoExecutable implements GracefulShutDownHandler, Haven
// shut down monero wallets and connections
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
log.info("Shutting down connections");
Connection.shutDownExecutor(30);
// done shutting down
log.info("Graceful shutdown completed. Exiting now.");
module.close(injector);
completeShutdown(resultHandler, EXIT_SUCCESS, systemExit);

View file

@ -109,13 +109,13 @@ public class WalletAppSetup {
log.info("Initialize WalletAppSetup with monero-java version {}", MoneroUtils.getVersion());
ObjectProperty<Throwable> walletServiceException = new SimpleObjectProperty<>();
xmrInfoBinding = EasyBind.combine(xmrConnectionService.downloadPercentageProperty(),
xmrConnectionService.chainHeightProperty(),
xmrInfoBinding = EasyBind.combine(
xmrConnectionService.numUpdatesProperty(), // receives notification of any connection update
xmrWalletService.downloadPercentageProperty(),
xmrWalletService.walletHeightProperty(),
walletServiceException,
getWalletServiceErrorMsg(),
(chainDownloadPercentage, chainHeight, walletDownloadPercentage, walletHeight, exception, errorMsg) -> {
(numConnectionUpdates, walletDownloadPercentage, walletHeight, exception, errorMsg) -> {
String result;
if (exception == null && errorMsg == null) {
@ -137,9 +137,9 @@ public class WalletAppSetup {
} else {
// update daemon sync progress
double chainDownloadPercentageD = (double) chainDownloadPercentage;
double chainDownloadPercentageD = xmrConnectionService.downloadPercentageProperty().doubleValue();
xmrDaemonSyncProgress.set(chainDownloadPercentageD);
Long bestChainHeight = chainHeight == null ? null : (Long) chainHeight;
Long bestChainHeight = xmrConnectionService.chainHeightProperty().get();
String chainHeightAsString = bestChainHeight != null && bestChainHeight > 0 ? String.valueOf(bestChainHeight) : "";
if (chainDownloadPercentageD == 1) {
String synchronizedWith = Res.get("mainView.footer.xmrInfo.connectedTo", getXmrDaemonNetworkAsString(), chainHeightAsString);

View file

@ -32,11 +32,13 @@ import haveno.core.offer.OfferBookService;
import haveno.core.offer.OpenOfferManager;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.xmr.setup.WalletsSetup;
import haveno.core.xmr.wallet.BtcWalletService;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.seed.SeedNodeRepository;
import lombok.extern.slf4j.Slf4j;
@ -93,11 +95,16 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable {
try {
if (injector != null) {
// notify trade protocols and wallets to prepare for shut down before shutting down
// notify trade protocols and wallets to prepare for shut down
Set<Runnable> tasks = new HashSet<Runnable>();
tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
HavenoUtils.executeTasks(tasks); // notify in parallel
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}
JsonFileManager.shutDownAllInstances();
injector.getInstance(ArbitratorManager.class).shutDown();
@ -117,8 +124,12 @@ public abstract class ExecutableForAppWithP2p extends HavenoExecutable {
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
module.close(injector);
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
resultHandler.handleResult();
log.info("Shutting down connections");
Connection.shutDownExecutor(30);
// done shutting down
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(HavenoExecutable.EXIT_SUCCESS), 1);
});
});

View file

@ -229,11 +229,12 @@ public class OfferFilterService {
Arbitrator thisArbitrator = user.getRegisteredArbitrator();
if (thisArbitrator != null && thisArbitrator.getNodeAddress().equals(offer.getOfferPayload().getArbitratorSigner())) {
if (thisArbitrator.getNodeAddress().equals(p2PService.getNetworkNode().getNodeAddress())) arbitrator = thisArbitrator; // TODO: unnecessary to compare arbitrator and p2pservice address?
} else {
// otherwise log warning that arbitrator is unregistered
List<NodeAddress> arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList());
log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
}
// otherwise log warning
List<NodeAddress> arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList());
log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
}
if (arbitrator == null) return false; // invalid arbitrator

View file

@ -111,6 +111,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener, PersistedDataHost {
private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class);
private static final String THREAD_ID = OpenOfferManager.class.getSimpleName();
private static final long RETRY_REPUBLISH_DELAY_SEC = 10;
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 30;
private static final long REPUBLISH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(40);
@ -307,6 +308,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}
public void shutDown(@Nullable Runnable completeHandler) {
HavenoUtils.shutDownThreadId(THREAD_ID);
stopped = true;
p2PService.getPeerManager().removeListener(this);
p2PService.removeDecryptedDirectMessageListener(this);
@ -403,56 +405,62 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
maybeUpdatePersistedOffers();
// Republish means we send the complete offer object
republishOffers();
startPeriodicRepublishOffersTimer();
HavenoUtils.submitToThread(() -> {
// Wait for prices to be available
priceFeedService.awaitPrices();
// Refresh is started once we get a success from republish
// Republish means we send the complete offer object
republishOffers();
startPeriodicRepublishOffersTimer();
// We republish after a bit as it might be that our connected node still has the offer in the data map
// but other peers have it already removed because of expired TTL.
// Those other not directly connected peers would not get the broadcast of the new offer, as the first
// connected peer (seed node) does not broadcast if it has the data in the map.
// To update quickly to the whole network we repeat the republishOffers call after a few seconds when we
// are better connected to the network. There is no guarantee that all peers will receive it but we also
// have our periodic timer, so after that longer interval the offer should be available to all peers.
if (retryRepublishOffersTimer == null)
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC);
// Refresh is started once we get a success from republish
p2PService.getPeerManager().addListener(this);
// We republish after a bit as it might be that our connected node still has the offer in the data map
// but other peers have it already removed because of expired TTL.
// Those other not directly connected peers would not get the broadcast of the new offer, as the first
// connected peer (seed node) does not broadcast if it has the data in the map.
// To update quickly to the whole network we repeat the republishOffers call after a few seconds when we
// are better connected to the network. There is no guarantee that all peers will receive it but we also
// have our periodic timer, so after that longer interval the offer should be available to all peers.
if (retryRepublishOffersTimer == null)
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC);
// TODO: add to invalid offers on failure
// openOffers.stream()
// .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService)
// .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg))));
p2PService.getPeerManager().addListener(this);
// process scheduled offers
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers: " + errorMessage);
});
// TODO: add to invalid offers on failure
// openOffers.stream()
// .forEach(openOffer -> OfferUtil.getInvalidMakerFeeTxErrorMessage(openOffer.getOffer(), btcWalletService)
// .ifPresent(errorMsg -> invalidOffers.add(new Tuple2<>(openOffer, errorMsg))));
// register to process unposted offers when unlocked balance increases
if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0);
xmrWalletService.addWalletListener(new MoneroWalletListener() {
@Override
public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) {
if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) {
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post
});
// process scheduled offers
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers: " + errorMessage);
});
// register to process unposted offers when unlocked balance increases
if (xmrWalletService.getWallet() != null) lastUnlockedBalance = xmrWalletService.getWallet().getUnlockedBalance(0);
xmrWalletService.addWalletListener(new MoneroWalletListener() {
@Override
public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) {
if (lastUnlockedBalance == null || lastUnlockedBalance.compareTo(newUnlockedBalance) < 0) {
processScheduledOffers((transaction) -> {}, (errorMessage) -> {
log.warn("Error processing unposted offers on new unlocked balance: " + errorMessage); // TODO: popup to notify user that offer did not post
});
}
lastUnlockedBalance = newUnlockedBalance;
}
lastUnlockedBalance = newUnlockedBalance;
});
// initialize key image poller for signed offers
maybeInitializeKeyImagePoller();
// poll spent status of key images
for (SignedOffer signedOffer : signedOffers.getList()) {
signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages());
}
});
// initialize key image poller for signed offers
maybeInitializeKeyImagePoller();
// poll spent status of key images
for (SignedOffer signedOffer : signedOffers.getList()) {
signedOfferKeyImagePoller.addKeyImages(signedOffer.getReserveTxKeyImages());
}
}, THREAD_ID);
}
@ -503,7 +511,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer openOffer = new OpenOffer(offer, triggerPrice, reserveExactAmount);
// schedule or post offer
new Thread(() -> {
HavenoUtils.submitToThread(() -> {
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> {
@ -520,7 +528,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
});
HavenoUtils.awaitLatch(latch);
}
}).start();
}, THREAD_ID);
}
// Remove from offerbook
@ -804,7 +812,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void processScheduledOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
ErrorMessageHandler errorMessageHandler) {
new Thread(() -> {
HavenoUtils.submitToThread(() -> {
synchronized (processOffersLock) {
List<String> errorMessages = new ArrayList<String>();
List<OpenOffer> openOffers = getOpenOffers();
@ -825,7 +833,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString());
else resultHandler.handleResult(null);
}
}).start();
}, THREAD_ID);
}
private void processUnpostedOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
@ -1569,8 +1577,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer();
priceFeedService.awaitPrices();
new Thread(() -> {
processListForRepublishOffers(getOpenOffers());
}).start();
@ -1653,7 +1659,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, openOffer.getTriggerPrice());
// repost offer
new Thread(() -> {
HavenoUtils.submitToThread(() -> {
synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(getOpenOffers(), updatedOpenOffer, (transaction) -> {
@ -1670,7 +1676,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
});
HavenoUtils.awaitLatch(latch);
}
}).start();
}, THREAD_ID);
}
}

View file

@ -133,7 +133,7 @@ public class MakerSendSignOfferRequest extends Task<PlaceOfferModel> {
// if unavailable, try alternative arbitrator
@Override
public void onFault(String errorMessage) {
log.warn("Arbitrator {} unavailable: {}", arbitratorNodeAddress, errorMessage);
log.warn("Arbitrator unavailable: address={}: {}", arbitratorNodeAddress, errorMessage);
excludedArbitrators.add(arbitratorNodeAddress);
Arbitrator altArbitrator = DisputeAgentSelection.getRandomArbitrator(model.getArbitratorManager(), excludedArbitrators);
if (altArbitrator == null) {

View file

@ -116,7 +116,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
log.info("Received {} from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid());
HavenoUtils.runTask(message.getTradeId(), () -> {
HavenoUtils.submitToThread(() -> {
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
@ -126,7 +126,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
} else {
log.warn("Unsupported message at dispatchMessage. message={}", message);
}
});
}, message.getTradeId());
}
}

View file

@ -473,14 +473,22 @@ public class HavenoUtils {
}
}
public static Future<?> runTask(String threadId, Runnable task) {
public static Future<?> submitToPool(Runnable task) {
return POOL.submit(task);
}
public static Future<?> submitToSharedThread(Runnable task) {
return submitToThread(task, HavenoUtils.class.getSimpleName());
}
public static Future<?> submitToThread(Runnable task, String threadId) {
synchronized (POOLS) {
if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1));
return POOLS.get(threadId).submit(task);
}
}
public static void removeThreadId(String threadId) {
public static void shutDownThreadId(String threadId) {
synchronized (POOLS) {
if (POOLS.containsKey(threadId)) {
POOLS.get(threadId).shutdown();
@ -489,33 +497,20 @@ public class HavenoUtils {
}
}
/**
* Submit tasks to a global thread pool.
*/
public static Future<?> submitTask(Runnable task) {
return POOL.submit(task);
// TODO: update monero-java and replace with GenUtils.awaitTasks()
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks) {
return awaitTasks(tasks, tasks.size());
}
public static List<Future<?>> submitTasks(List<Runnable> tasks) {
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency) {
return awaitTasks(tasks, maxConcurrency, null);
}
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutSeconds) {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(submitTask(task));
return futures;
}
// TODO: replace with GenUtils.executeTasks() once monero-java updated
public static void executeTasks(Collection<Runnable> tasks) {
executeTasks(tasks, tasks.size());
}
public static void executeTasks(Collection<Runnable> tasks, int maxConcurrency) {
executeTasks(tasks, maxConcurrency, null);
}
public static void executeTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutSeconds) {
if (tasks.isEmpty()) return;
if (tasks.isEmpty()) return futures;
ExecutorService pool = Executors.newFixedThreadPool(maxConcurrency);
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(pool.submit(task));
pool.shutdown();
@ -535,6 +530,7 @@ public class HavenoUtils {
} catch (Exception e) {
throw new RuntimeException(e);
}
return futures;
}
public static String toCamelCase(String underscore) {

View file

@ -590,7 +590,7 @@ public abstract class Trade implements Tradable, Model {
// handle daemon changes with max parallelization
xmrWalletService.getConnectionService().addConnectionListener(newConnection -> {
HavenoUtils.submitTask(() -> onConnectionChanged(newConnection));
HavenoUtils.submitToPool(() -> onConnectionChanged(newConnection));
});
// check if done
@ -1812,9 +1812,7 @@ public abstract class Trade implements Tradable, Model {
// sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) {
HavenoUtils.submitTask(() -> {
initSyncing();
});
new Thread(() -> initSyncing()).start();
}
}
}
@ -2053,7 +2051,7 @@ public abstract class Trade implements Tradable, Model {
@Override
public void onNewBlock(long height) {
HavenoUtils.submitTask(() -> { // allow rapid notifications
HavenoUtils.submitToThread(() -> { // allow rapid notifications
// skip rapid succession blocks
synchronized (this) {
@ -2087,7 +2085,7 @@ public abstract class Trade implements Tradable, Model {
e.printStackTrace();
if (isInitialized && !isShutDownStarted && !isWalletConnected()) throw e;
}
});
}, getId());
}
}

View file

@ -235,7 +235,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (!(networkEnvelope instanceof TradeMessage)) return;
String tradeId = ((TradeMessage) networkEnvelope).getTradeId();
HavenoUtils.runTask(tradeId, () -> {
HavenoUtils.submitToThread(() -> {
if (networkEnvelope instanceof InitTradeRequest) {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
} else if (networkEnvelope instanceof InitMultisigRequest) {
@ -249,7 +249,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} else if (networkEnvelope instanceof DepositResponse) {
handleDepositResponse((DepositResponse) networkEnvelope, peer);
}
});
}, tradeId);
}
@ -316,7 +316,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
HavenoUtils.executeTasks(tasks);
HavenoUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error notifying trades that shut down started: {}", e.getMessage());
e.printStackTrace();
@ -346,7 +346,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
try {
HavenoUtils.executeTasks(tasks);
HavenoUtils.awaitTasks(tasks);
} catch (Exception e) {
log.warn("Error shutting down trades: {}", e.getMessage());
e.printStackTrace();
@ -443,7 +443,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
});
};
HavenoUtils.executeTasks(tasks, threadPoolSize);
HavenoUtils.awaitTasks(tasks, threadPoolSize);
log.info("Done initializing persisted trades");
if (isShutDownStarted) return;
@ -452,7 +452,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// sync idle trades once in background after active trades
for (Trade trade : trades) {
if (trade.isIdling()) HavenoUtils.submitTask(() -> trade.syncAndPollWallet());
if (trade.isIdling()) HavenoUtils.submitToPool(() -> trade.syncAndPollWallet());
}
// process after all wallets initialized
@ -1205,7 +1205,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade
tradableList.remove(trade);
HavenoUtils.removeThreadId(trade.getId());
HavenoUtils.shutDownThreadId(trade.getId());
// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));

View file

@ -104,6 +104,7 @@ public class XmrWalletService {
private static final int MONERO_LOG_LEVEL = 0;
private static final int MAX_SYNC_ATTEMPTS = 3;
private static final boolean PRINT_STACK_TRACE = false;
private static final String THREAD_ID = XmrWalletService.class.getSimpleName();
private final Preferences preferences;
private final CoreAccountService accountService;
@ -668,9 +669,6 @@ public class XmrWalletService {
wallet.removeListener(listener);
}
}
// prepare trades for shut down
if (tradeManager != null) tradeManager.onShutDownStarted();
}
public void shutDown() {
@ -681,7 +679,7 @@ public class XmrWalletService {
List<Runnable> tasks = new ArrayList<Runnable>();
if (tradeManager != null) tasks.add(() -> tradeManager.shutDown());
tasks.add(() -> closeMainWallet(true));
HavenoUtils.executeTasks(tasks);
HavenoUtils.awaitTasks(tasks);
log.info("Done shutting down all wallets");
}
@ -767,12 +765,12 @@ public class XmrWalletService {
// reschedule to init main wallet
UserThread.runAfter(() -> {
new Thread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS)).start();
HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, MAX_SYNC_ATTEMPTS), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
} else {
log.warn("Trying again in {} seconds", xmrConnectionService.getRefreshPeriodMs() / 1000);
UserThread.runAfter(() -> {
new Thread(() -> maybeInitMainWallet(true, numAttempts - 1)).start();
HavenoUtils.submitToThread(() -> maybeInitMainWallet(true, numAttempts - 1), THREAD_ID);
}, xmrConnectionService.getRefreshPeriodMs() / 1000);
}
}
@ -803,20 +801,22 @@ public class XmrWalletService {
}
private void updateSyncProgress() {
walletHeight.set(wallet.getHeight());
UserThread.await(() -> {
walletHeight.set(wallet.getHeight());
// new wallet reports height 1 before synced
if (wallet.getHeight() == 1) {
downloadListener.progress(.0001, xmrConnectionService.getTargetHeight(), null); // >0% shows progress bar
return;
}
// new wallet reports height 1 before synced
if (wallet.getHeight() == 1) {
downloadListener.progress(.0001, xmrConnectionService.getTargetHeight(), null); // >0% shows progress bar
return;
}
// set progress
long targetHeight = xmrConnectionService.getTargetHeight();
long blocksLeft = targetHeight - walletHeight.get();
if (syncStartHeight == null) syncStartHeight = walletHeight.get();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, walletHeight.get() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
// set progress
long targetHeight = xmrConnectionService.getTargetHeight();
long blocksLeft = targetHeight - walletHeight.get();
if (syncStartHeight == null) syncStartHeight = walletHeight.get();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, walletHeight.get() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
});
}
private MoneroWalletRpc createWalletRpc(MoneroWalletConfig config, Integer port) {
@ -938,14 +938,16 @@ public class XmrWalletService {
// sync wallet on new thread
if (connection != null) {
wallet.getDaemonConnection().setPrintStackTrace(PRINT_STACK_TRACE);
new Thread(() -> {
try {
if (!Boolean.FALSE.equals(connection.isConnected())) wallet.sync();
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs());
} catch (Exception e) {
log.warn("Failed to sync main wallet after setting daemon connection: " + e.getMessage());
HavenoUtils.submitToThread(() -> {
synchronized (walletLock) {
try {
if (Boolean.TRUE.equals(connection.isConnected())) wallet.sync();
wallet.startSyncing(xmrConnectionService.getRefreshPeriodMs());
} catch (Exception e) {
log.warn("Failed to sync main wallet after setting daemon connection: " + e.getMessage());
}
}
}).start();
}, THREAD_ID);
}
log.info("Done setting main wallet daemon connection: " + (connection == null ? null : connection.getUri()));
@ -977,7 +979,7 @@ public class XmrWalletService {
}
// excute tasks in parallel
HavenoUtils.executeTasks(tasks, Math.min(10, 1 + trades.size()));
HavenoUtils.awaitTasks(tasks, Math.min(10, 1 + trades.size()));
log.info("Done changing all wallet passwords");
}
@ -1259,17 +1261,14 @@ public class XmrWalletService {
BigInteger balance;
if (balanceListener.getSubaddressIndex() != null && balanceListener.getSubaddressIndex() != 0) balance = getBalanceForSubaddress(balanceListener.getSubaddressIndex());
else balance = getAvailableBalance();
UserThread.execute(new Runnable() { // TODO (woodser): don't execute on UserThread
@Override
public void run() {
try {
balanceListener.onBalanceChanged(balance);
} catch (Exception e) {
log.warn("Failed to notify balance listener of change");
e.printStackTrace();
}
HavenoUtils.submitToThread(() -> {
try {
balanceListener.onBalanceChanged(balance);
} catch (Exception e) {
log.warn("Failed to notify balance listener of change");
e.printStackTrace();
}
});
}, THREAD_ID);
}
}
@ -1313,54 +1312,39 @@ public class XmrWalletService {
@Override
public void onSyncProgress(long height, long startHeight, long endHeight, double percentDone, String message) {
UserThread.execute(new Runnable() {
@Override
public void run() {
for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message);
}
});
HavenoUtils.submitToThread(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onSyncProgress(height, startHeight, endHeight, percentDone, message);
}, THREAD_ID);
}
@Override
public void onNewBlock(long height) {
UserThread.execute(new Runnable() {
@Override
public void run() {
walletHeight.set(height);
for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height);
}
});
HavenoUtils.submitToThread(() -> {
walletHeight.set(height);
for (MoneroWalletListenerI listener : walletListeners) listener.onNewBlock(height);
}, THREAD_ID);
}
@Override
public void onBalancesChanged(BigInteger newBalance, BigInteger newUnlockedBalance) {
UserThread.execute(new Runnable() {
@Override
public void run() {
for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance);
updateBalanceListeners();
}
});
HavenoUtils.submitToThread(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onBalancesChanged(newBalance, newUnlockedBalance);
updateBalanceListeners();
}, THREAD_ID);
}
@Override
public void onOutputReceived(MoneroOutputWallet output) {
UserThread.execute(new Runnable() {
@Override
public void run() {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output);
}
});
HavenoUtils.submitToThread(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputReceived(output);
}, THREAD_ID);
}
@Override
public void onOutputSpent(MoneroOutputWallet output) {
UserThread.execute(new Runnable() {
@Override
public void run() {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output);
}
});
HavenoUtils.submitToThread(() -> {
for (MoneroWalletListenerI listener : walletListeners) listener.onOutputSpent(output);
}, THREAD_ID);
}
}
}

View file

@ -20,6 +20,7 @@ package haveno.desktop.main.funds.deposit;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import haveno.common.UserThread;
import haveno.core.locale.Res;
import haveno.core.trade.HavenoUtils;
import haveno.core.util.coin.CoinFormatter;
@ -65,9 +66,11 @@ class DepositListItem {
balanceListener = new XmrBalanceListener(addressEntry.getSubaddressIndex()) {
@Override
public void onBalanceChanged(BigInteger balance) {
DepositListItem.this.balanceAsBI = balance;
DepositListItem.this.balance.set(HavenoUtils.formatXmr(balanceAsBI));
updateUsage(addressEntry.getSubaddressIndex(), null);
UserThread.execute(() -> {
DepositListItem.this.balanceAsBI = balance;
DepositListItem.this.balance.set(HavenoUtils.formatXmr(balanceAsBI));
updateUsage(addressEntry.getSubaddressIndex(), null);
});
}
};
xmrWalletService.addBalanceListener(balanceListener);

View file

@ -902,13 +902,15 @@ public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOffer
takeOfferBox.getChildren().add(takeOfferButton);
takeOfferBox.visibleProperty().addListener((observable, oldValue, newValue) -> {
if (newValue) {
fundingHBox.getChildren().remove(cancelButton2);
takeOfferBox.getChildren().add(cancelButton2);
} else if (!fundingHBox.getChildren().contains(cancelButton2)) {
takeOfferBox.getChildren().remove(cancelButton2);
fundingHBox.getChildren().add(cancelButton2);
}
UserThread.execute(() -> {
if (newValue) {
fundingHBox.getChildren().remove(cancelButton2);
takeOfferBox.getChildren().add(cancelButton2);
} else if (!fundingHBox.getChildren().contains(cancelButton2)) {
takeOfferBox.getChildren().remove(cancelButton2);
fundingHBox.getChildren().add(cancelButton2);
}
});
});
cancelButton2 = new AutoTooltipButton(Res.get("shared.cancel"));

View file

@ -74,6 +74,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -109,6 +110,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(240);
private static final int SHUTDOWN_TIMEOUT = 100;
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); // one shared thread to handle messages sequentially
public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
@ -122,6 +124,17 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return SHUTDOWN_TIMEOUT;
}
public static void shutDownExecutor(int timeoutSeconds) {
try {
EXECUTOR.shutdown();
if (!EXECUTOR.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) EXECUTOR.shutdownNow();
} catch (InterruptedException e) {
EXECUTOR.shutdownNow();
e.printStackTrace();
log.warn("Error shutting down connection executor: " + e.getMessage());
}
};
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
@ -211,7 +224,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
reportInvalidRequest(RuleViolation.PEER_BANNED);
}
}
UserThread.execute(() -> connectionListener.onConnection(this));
EXECUTOR.execute(() -> connectionListener.onConnection(this));
} catch (Throwable e) {
handleException(e);
}
@ -266,8 +279,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
protoOutputStream.writeEnvelope(networkEnvelope);
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)));
UserThread.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize));
EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessageSent(networkEnvelope, this)));
EXECUTOR.execute(() -> connectionStatistics.addSendMsgMetrics(System.currentTimeMillis() - ts, networkEnvelopeSize));
}
} catch (Throwable t) {
handleException(t);
@ -396,7 +409,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof BundleOfEnvelopes) {
onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection);
} else {
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
EXECUTOR.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
}
}
@ -432,7 +445,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
envelopesToProcess.add(networkEnvelope);
}
}
envelopesToProcess.forEach(envelope -> UserThread.execute(() ->
envelopesToProcess.forEach(envelope -> EXECUTOR.execute(() ->
messageListeners.forEach(listener -> listener.onMessage(envelope, connection))));
}
@ -516,7 +529,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
}
private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
protoOutputStream.onConnectionShutdown();
@ -539,7 +551,6 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
Utilities.shutdownAndAwaitTermination(executorService, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete {}", this);
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
@ -847,8 +858,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
networkEnvelope.getClass().getSimpleName(), uid);
}
onMessage(networkEnvelope, this);
UserThread.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size));
EXECUTOR.execute(() -> onMessage(networkEnvelope, this));
EXECUTOR.execute(() -> connectionStatistics.addReceivedMsgMetrics(System.currentTimeMillis() - ts, size));
}
} catch (InvalidClassException e) {
log.error(e.getMessage());
@ -897,7 +908,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
EXECUTOR.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
return false;