fixes after updating connection and message packages

This commit is contained in:
woodser 2023-04-25 14:35:25 -04:00
parent e0db4528da
commit 3f7489269f
4 changed files with 22 additions and 43 deletions

View file

@ -344,21 +344,22 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
int size = openOffers.size(); int size = openOffers.size();
log.info("Remove open offers at shutDown. Number of open offers: {}", size); log.info("Remove open offers at shutDown. Number of open offers: {}", size);
if (offerBookService.isBootstrapped() && size > 0) { if (offerBookService.isBootstrapped() && size > 0) {
UserThread.execute(() -> openOffers.forEach( UserThread.execute(() -> {
openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()) openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()));
));
// Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the // Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the
// bundled messages sent out. // bundled messages sent out.
broadcaster.flush(); broadcaster.flush();
if (completeHandler != null) { if (completeHandler != null) {
// For typical number of offers we are tolerant with delay to give enough time to broadcast. // For typical number of offers we are tolerant with delay to give enough time to broadcast.
// If number of offers is very high we limit to 3 sec. to not delay other shutdown routines. // If number of offers is very high we limit to 3 sec. to not delay other shutdown routines.
int delay = Math.min(3000, size * 200 + 500); int delay = Math.min(3000, size * 200 + 500);
UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS); UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS);
} }
});
} else { } else {
broadcaster.flush();
if (completeHandler != null) if (completeHandler != null)
completeHandler.run(); completeHandler.run();
} }

View file

@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class PriceRequest { public class PriceRequest {
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60); private final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60);
@Nullable @Nullable
private PriceProvider provider; private PriceProvider provider;
private boolean shutDownRequested; private boolean shutDownRequested;

View file

@ -144,15 +144,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// We need to have both the initial data delivered and the hidden service published // We need to have both the initial data delivered and the hidden service published
networkReadyBinding = EasyBind.combine(hiddenServicePublished, preliminaryDataReceived, networkReadyBinding = EasyBind.combine(hiddenServicePublished, preliminaryDataReceived,
(hiddenServicePublished, preliminaryDataReceived) (hiddenServicePublished, preliminaryDataReceived) -> hiddenServicePublished && preliminaryDataReceived);
-> hiddenServicePublished && preliminaryDataReceived);
networkReadySubscription = networkReadyBinding.subscribe((observable, oldValue, newValue) -> { networkReadySubscription = networkReadyBinding.subscribe((observable, oldValue, newValue) -> {
if (newValue) if (newValue)
onNetworkReady(); onNetworkReady();
}); });
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -178,6 +176,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
public void shutDown(Runnable shutDownCompleteHandler) { public void shutDown(Runnable shutDownCompleteHandler) {
log.info("P2PService shutdown started");
shutDownResultHandlers.add(shutDownCompleteHandler); shutDownResultHandlers.add(shutDownCompleteHandler);
// We need to make sure queued up messages are flushed out before we continue shut down other network // We need to make sure queued up messages are flushed out before we continue shut down other network
@ -216,15 +215,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
if (networkNode != null) { if (networkNode != null) {
networkNode.shutDown(() -> { networkNode.shutDown(() -> shutDownResultHandlers.forEach(Runnable::run));
shutDownResultHandlers.forEach(Runnable::run);
});
} else { } else {
shutDownResultHandlers.forEach(Runnable::run); shutDownResultHandlers.forEach(Runnable::run);
} }
} }
/** /**
* Startup sequence: * Startup sequence:
* <p/> * <p/>
@ -289,7 +285,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
UserThread.runAfter(peerExchangeManager::initialRequestPeersFromReportedOrPersistedPeers, 300, TimeUnit.MILLISECONDS); UserThread.runAfter(peerExchangeManager::initialRequestPeersFromReportedOrPersistedPeers, 300, TimeUnit.MILLISECONDS);
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// RequestDataManager.Listener implementation // RequestDataManager.Listener implementation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -338,7 +333,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation // ConnectionListener implementation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -357,7 +351,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3); UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3);
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation // MessageListener implementation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -370,13 +363,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned()); DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope()); connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> { decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
try {
e.onDirectMessage(decryptedMsg, nodeAddress);
} catch (Exception e2) {
e2.printStackTrace();
}
}),
() -> { () -> {
log.error("peersNodeAddress is expected to be available at onMessage for " + log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage."); "processing PrefixedSealedAndSignedMessage.");
@ -391,7 +378,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages // DirectMessages
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -453,7 +439,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Data storage // Data storage
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -504,7 +489,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Listeners // Listeners
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -533,7 +517,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
p2PDataStorage.removeHashMapChangedListener(hashMapChangedListener); p2PDataStorage.removeHashMapChangedListener(hashMapChangedListener);
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Getters // Getters
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -114,8 +114,7 @@ public abstract class NetworkNode implements MessageListener {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Calls this (and other registered) setup listener's ``onTorNodeReady()`` and // Calls this (and other registered) setup listener's ``onTorNodeReady()`` and ``onHiddenServicePublished``
// ``onHiddenServicePublished``
// when the events happen. // when the events happen.
public abstract void start(@Nullable SetupListener setupListener); public abstract void start(@Nullable SetupListener setupListener);
@ -159,10 +158,8 @@ public abstract class NetworkNode implements MessageListener {
if (duration > CREATE_SOCKET_TIMEOUT) if (duration > CREATE_SOCKET_TIMEOUT)
throw new TimeoutException("A timeout occurred when creating a socket."); throw new TimeoutException("A timeout occurred when creating a socket.");
// Tor needs sometimes quite long to create a connection. To avoid that we get // Tor needs sometimes quite long to create a connection. To avoid that we get too many
// too many // connections with the same peer we check again if we still don't have any connection for that node address.
// connections with the same peer we check again if we still don't have any
// connection for that node address.
Connection existingConnection = getInboundConnection(peersNodeAddress); Connection existingConnection = getInboundConnection(peersNodeAddress);
if (existingConnection == null) if (existingConnection == null)
existingConnection = getOutboundConnection(peersNodeAddress); existingConnection = getOutboundConnection(peersNodeAddress);
@ -296,9 +293,7 @@ public abstract class NetworkNode implements MessageListener {
SettableFuture<Connection> resultFuture = SettableFuture.create(); SettableFuture<Connection> resultFuture = SettableFuture.create();
try { try {
ListenableFuture<Connection> future = executor.submit(() -> { ListenableFuture<Connection> future = executor.submit(() -> {
String id = connection.getPeersNodeAddressOptional().isPresent() ? String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
connection.getPeersNodeAddressOptional().get().getFullAddress() :
connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + Utilities.toTruncatedString(id, 15)); Thread.currentThread().setName("NetworkNode:SendMessage-to-" + Utilities.toTruncatedString(id, 15));
connection.sendMessage(networkEnvelope); connection.sendMessage(networkEnvelope);