diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index f25bd11e..4a619932 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -344,21 +344,22 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe int size = openOffers.size(); log.info("Remove open offers at shutDown. Number of open offers: {}", size); if (offerBookService.isBootstrapped() && size > 0) { - UserThread.execute(() -> openOffers.forEach( - openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()) - )); + UserThread.execute(() -> { + openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())); - // Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the - // bundled messages sent out. - broadcaster.flush(); + // Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the + // bundled messages sent out. + broadcaster.flush(); - if (completeHandler != null) { - // For typical number of offers we are tolerant with delay to give enough time to broadcast. - // If number of offers is very high we limit to 3 sec. to not delay other shutdown routines. - int delay = Math.min(3000, size * 200 + 500); - UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS); - } + if (completeHandler != null) { + // For typical number of offers we are tolerant with delay to give enough time to broadcast. + // If number of offers is very high we limit to 3 sec. to not delay other shutdown routines. + int delay = Math.min(3000, size * 200 + 500); + UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS); + } + }); } else { + broadcaster.flush(); if (completeHandler != null) completeHandler.run(); } diff --git a/core/src/main/java/haveno/core/provider/price/PriceRequest.java b/core/src/main/java/haveno/core/provider/price/PriceRequest.java index 5bbb707a..ad6070fa 100644 --- a/core/src/main/java/haveno/core/provider/price/PriceRequest.java +++ b/core/src/main/java/haveno/core/provider/price/PriceRequest.java @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public class PriceRequest { - private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60); + private final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60); @Nullable private PriceProvider provider; private boolean shutDownRequested; diff --git a/p2p/src/main/java/haveno/network/p2p/P2PService.java b/p2p/src/main/java/haveno/network/p2p/P2PService.java index 5d3fb7cd..c517dc86 100644 --- a/p2p/src/main/java/haveno/network/p2p/P2PService.java +++ b/p2p/src/main/java/haveno/network/p2p/P2PService.java @@ -144,15 +144,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // We need to have both the initial data delivered and the hidden service published networkReadyBinding = EasyBind.combine(hiddenServicePublished, preliminaryDataReceived, - (hiddenServicePublished, preliminaryDataReceived) - -> hiddenServicePublished && preliminaryDataReceived); + (hiddenServicePublished, preliminaryDataReceived) -> hiddenServicePublished && preliminaryDataReceived); networkReadySubscription = networkReadyBinding.subscribe((observable, oldValue, newValue) -> { if (newValue) onNetworkReady(); }); } - /////////////////////////////////////////////////////////////////////////////////////////// // API /////////////////////////////////////////////////////////////////////////////////////////// @@ -178,6 +176,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } public void shutDown(Runnable shutDownCompleteHandler) { + log.info("P2PService shutdown started"); shutDownResultHandlers.add(shutDownCompleteHandler); // We need to make sure queued up messages are flushed out before we continue shut down other network @@ -216,15 +215,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } if (networkNode != null) { - networkNode.shutDown(() -> { - shutDownResultHandlers.forEach(Runnable::run); - }); + networkNode.shutDown(() -> shutDownResultHandlers.forEach(Runnable::run)); } else { shutDownResultHandlers.forEach(Runnable::run); } } - /** * Startup sequence: *

@@ -289,7 +285,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis UserThread.runAfter(peerExchangeManager::initialRequestPeersFromReportedOrPersistedPeers, 300, TimeUnit.MILLISECONDS); } - /////////////////////////////////////////////////////////////////////////////////////////// // RequestDataManager.Listener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -338,7 +333,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -357,7 +351,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3); } - /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -370,13 +363,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned()); connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope()); connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> - decryptedDirectMessageListeners.forEach(e -> { - try { - e.onDirectMessage(decryptedMsg, nodeAddress); - } catch (Exception e2) { - e2.printStackTrace(); - } - }), + decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)), () -> { log.error("peersNodeAddress is expected to be available at onMessage for " + "processing PrefixedSealedAndSignedMessage."); @@ -391,7 +378,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - /////////////////////////////////////////////////////////////////////////////////////////// // DirectMessages /////////////////////////////////////////////////////////////////////////////////////////// @@ -453,7 +439,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - /////////////////////////////////////////////////////////////////////////////////////////// // Data storage /////////////////////////////////////////////////////////////////////////////////////////// @@ -504,7 +489,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - /////////////////////////////////////////////////////////////////////////////////////////// // Listeners /////////////////////////////////////////////////////////////////////////////////////////// @@ -533,7 +517,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis p2PDataStorage.removeHashMapChangedListener(hashMapChangedListener); } - /////////////////////////////////////////////////////////////////////////////////////////// // Getters /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java b/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java index 1b9f54dc..16ad2341 100644 --- a/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java @@ -114,8 +114,7 @@ public abstract class NetworkNode implements MessageListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - // Calls this (and other registered) setup listener's ``onTorNodeReady()`` and - // ``onHiddenServicePublished`` + // Calls this (and other registered) setup listener's ``onTorNodeReady()`` and ``onHiddenServicePublished`` // when the events happen. public abstract void start(@Nullable SetupListener setupListener); @@ -159,10 +158,8 @@ public abstract class NetworkNode implements MessageListener { if (duration > CREATE_SOCKET_TIMEOUT) throw new TimeoutException("A timeout occurred when creating a socket."); - // Tor needs sometimes quite long to create a connection. To avoid that we get - // too many - // connections with the same peer we check again if we still don't have any - // connection for that node address. + // Tor needs sometimes quite long to create a connection. To avoid that we get too many + // connections with the same peer we check again if we still don't have any connection for that node address. Connection existingConnection = getInboundConnection(peersNodeAddress); if (existingConnection == null) existingConnection = getOutboundConnection(peersNodeAddress); @@ -296,9 +293,7 @@ public abstract class NetworkNode implements MessageListener { SettableFuture resultFuture = SettableFuture.create(); try { ListenableFuture future = executor.submit(() -> { - String id = connection.getPeersNodeAddressOptional().isPresent() ? - connection.getPeersNodeAddressOptional().get().getFullAddress() : - connection.getUid(); + String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); Thread.currentThread().setName("NetworkNode:SendMessage-to-" + Utilities.toTruncatedString(id, 15)); connection.sendMessage(networkEnvelope);