diff --git a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java index 537bffb5..ca6e0eed 100644 --- a/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java +++ b/core/src/main/java/haveno/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java @@ -28,6 +28,7 @@ import haveno.core.offer.availability.DisputeAgentSelection; import haveno.core.offer.messages.SignOfferRequest; import haveno.core.offer.placeoffer.PlaceOfferModel; import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator; +import haveno.core.trade.HavenoUtils; import haveno.core.xmr.model.XmrAddressEntry; import haveno.network.p2p.AckMessage; import haveno.network.p2p.DecryptedDirectMessageListener; @@ -164,7 +165,8 @@ public class MakerSendSignOfferRequest extends Task { arbitratorNodeAddress, arbitrator.getPubKeyRing(), request, - listener + listener, + HavenoUtils.ARBITRATOR_ACK_TIMEOUT_SECONDS ); } } diff --git a/core/src/main/java/haveno/core/trade/HavenoUtils.java b/core/src/main/java/haveno/core/trade/HavenoUtils.java index 625cd483..5843811f 100644 --- a/core/src/main/java/haveno/core/trade/HavenoUtils.java +++ b/core/src/main/java/haveno/core/trade/HavenoUtils.java @@ -66,6 +66,7 @@ public class HavenoUtils { private static final String RELEASE_DATE = "01-03-2024 00:00:00"; // optionally set to release date of the network in format dd-mm-yyyy to impose temporary limits, etc. e.g. "01-03-2024 00:00:00" public static final int RELEASE_LIMIT_DAYS = 60; // number of days to limit sell offers to max buy limit for new accounts public static final int WARN_ON_OFFER_EXCEEDS_UNSIGNED_BUY_LIMIT_DAYS = 182; // number of days to warn if sell offer exceeds unsigned buy limit + public static final int ARBITRATOR_ACK_TIMEOUT_SECONDS = 15; // non-configurable public static final DecimalFormatSymbols DECIMAL_FORMAT_SYMBOLS = DecimalFormatSymbols.getInstance(Locale.US); // use the US locale as a base for all DecimalFormats (commas should be omitted from number strings) diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/TakerSendInitTradeRequestToArbitrator.java b/core/src/main/java/haveno/core/trade/protocol/tasks/TakerSendInitTradeRequestToArbitrator.java index 4cebead9..8fa8ea34 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/TakerSendInitTradeRequestToArbitrator.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/TakerSendInitTradeRequestToArbitrator.java @@ -23,6 +23,7 @@ import haveno.common.handlers.ResultHandler; import haveno.common.taskrunner.TaskRunner; import haveno.core.offer.availability.DisputeAgentSelection; import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator; +import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.InitTradeRequest; import haveno.network.p2p.NodeAddress; @@ -144,7 +145,8 @@ public class TakerSendInitTradeRequestToArbitrator extends TradeTask { arbitratorNodeAddress, arbitrator.getPubKeyRing(), arbitratorRequest, - listener + listener, + HavenoUtils.ARBITRATOR_ACK_TIMEOUT_SECONDS ); } } diff --git a/p2p/src/main/java/haveno/network/p2p/P2PService.java b/p2p/src/main/java/haveno/network/p2p/P2PService.java index ac244f72..be7b61df 100644 --- a/p2p/src/main/java/haveno/network/p2p/P2PService.java +++ b/p2p/src/main/java/haveno/network/p2p/P2PService.java @@ -382,12 +382,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // DirectMessages /////////////////////////////////////////////////////////////////////////////////////////// - // TODO OfferAvailabilityResponse is called twice! public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { + sendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener, null); + } + + public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, + SendDirectMessageListener sendDirectMessageListener, Integer timeoutSeconds) { checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedDirectMessage)"); if (isBootstrapped()) { - doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener); + doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener, timeoutSeconds); } else { throw new NetworkNotReadyException(); } @@ -396,7 +400,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, - SendDirectMessageListener sendDirectMessageListener) { + SendDirectMessageListener sendDirectMessageListener, + Integer timeoutSeconds) { log.debug("Send encrypted direct message {} to peer {}", message.getClass().getSimpleName(), peersNodeAddress); @@ -417,7 +422,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis networkNode.getNodeAddress(), encryptionService.encryptAndSign(pubKeyRing, message)); - SettableFuture future = networkNode.sendMessage(peersNodeAddress, sealedMsg); + SettableFuture future = networkNode.sendMessage(peersNodeAddress, sealedMsg, timeoutSeconds); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable Connection connection) { diff --git a/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java b/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java index be9d6089..d910fc2c 100644 --- a/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/haveno/network/p2p/network/NetworkNode.java @@ -48,6 +48,7 @@ import java.util.Date; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -120,6 +121,11 @@ public abstract class NetworkNode implements MessageListener { public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) { + return sendMessage(peersNodeAddress, networkEnvelope, null); + } + + public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, + NetworkEnvelope networkEnvelope, Integer timeoutSeconds) { log.debug("Send {} to {}. Message details: {}", networkEnvelope.getClass().getSimpleName(), peersNodeAddress, Utilities.toTruncatedString(networkEnvelope)); @@ -136,107 +142,109 @@ public abstract class NetworkNode implements MessageListener { log.debug("We have not found any connection for peerAddress {}.\n\t" + "We will create a new outbound connection.", peersNodeAddress); - SettableFuture resultFuture = SettableFuture.create(); - ListenableFuture future = connectionExecutor.submit(() -> { - Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" - + Utilities.toTruncatedString(peersNodeAddress.getFullAddress(), 15)); - if (peersNodeAddress.equals(getNodeAddress())) { - log.warn("We are sending a message to ourselves"); - } - - OutboundConnection outboundConnection; - // can take a while when using tor - long startTs = System.currentTimeMillis(); - - log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress()); - - Socket socket = createSocket(peersNodeAddress); - long duration = System.currentTimeMillis() - startTs; - log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(), - duration); - - if (duration > CREATE_SOCKET_TIMEOUT) - throw new TimeoutException("A timeout occurred when creating a socket."); - - // Tor needs sometimes quite long to create a connection. To avoid that we get too many - // connections with the same peer we check again if we still don't have any connection for that node address. - Connection existingConnection = getInboundConnection(peersNodeAddress); - if (existingConnection == null) - existingConnection = getOutboundConnection(peersNodeAddress); - - if (existingConnection != null) { - log.debug("We found in the meantime a connection for peersNodeAddress {}, " + - "so we use that for sending the message.\n" + - "That can happen if Tor needs long for creating a new outbound connection.\n" + - "We might have got a new inbound or outbound connection.", - peersNodeAddress.getFullAddress()); - - try { - socket.close(); - } catch (Throwable throwable) { - if (!shutDownInProgress) { - log.error("Error at closing socket " + throwable); - } + SettableFuture resultFuture = SettableFuture.create(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" + + Utilities.toTruncatedString(peersNodeAddress.getFullAddress(), 15)); + if (peersNodeAddress.equals(getNodeAddress())) { + log.warn("We are sending a message to ourselves"); } - existingConnection.sendMessage(networkEnvelope); - return existingConnection; - } else { - ConnectionListener connectionListener = new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - if (!connection.isStopped()) { - outBoundConnections.add((OutboundConnection) connection); - printOutBoundConnections(); - connectionListeners.forEach(e -> e.onConnection(connection)); + + OutboundConnection outboundConnection; + // can take a while when using tor + long startTs = System.currentTimeMillis(); + + log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress()); + + Socket socket = createSocket(peersNodeAddress); + long duration = System.currentTimeMillis() - startTs; + log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(), + duration); + + if (duration > CREATE_SOCKET_TIMEOUT) + throw new TimeoutException("A timeout occurred when creating a socket."); + + // Tor needs sometimes quite long to create a connection. To avoid that we get too many + // connections with the same peer we check again if we still don't have any connection for that node address. + Connection existingConnection = getInboundConnection(peersNodeAddress); + if (existingConnection == null) + existingConnection = getOutboundConnection(peersNodeAddress); + + if (existingConnection != null) { + log.debug("We found in the meantime a connection for peersNodeAddress {}, " + + "so we use that for sending the message.\n" + + "That can happen if Tor needs long for creating a new outbound connection.\n" + + "We might have got a new inbound or outbound connection.", + peersNodeAddress.getFullAddress()); + + try { + socket.close(); + } catch (Throwable throwable) { + if (!shutDownInProgress) { + log.error("Error at closing socket " + throwable); } } + existingConnection.sendMessage(networkEnvelope); + return existingConnection; + } else { + ConnectionListener connectionListener = new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + if (!connection.isStopped()) { + outBoundConnections.add((OutboundConnection) connection); + printOutBoundConnections(); + connectionListeners.forEach(e -> e.onConnection(connection)); + } + } - @Override - public void onDisconnect(CloseConnectionReason closeConnectionReason, - Connection connection) { - // noinspection SuspiciousMethodCalls - outBoundConnections.remove(connection); - printOutBoundConnections(); - connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection)); + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, + Connection connection) { + // noinspection SuspiciousMethodCalls + outBoundConnections.remove(connection); + printOutBoundConnections(); + connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection)); + } + }; + outboundConnection = new OutboundConnection(socket, + NetworkNode.this, + connectionListener, + peersNodeAddress, + networkProtoResolver, + banFilter); + + if (log.isDebugEnabled()) { + log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + + "NetworkNode created new outbound connection:" + + "\nmyNodeAddress=" + getNodeAddress() + + "\npeersNodeAddress=" + peersNodeAddress + + "\nuid=" + outboundConnection.getUid() + + "\nmessage=" + networkEnvelope + + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); } - }; - outboundConnection = new OutboundConnection(socket, - NetworkNode.this, - connectionListener, - peersNodeAddress, - networkProtoResolver, - banFilter); - - if (log.isDebugEnabled()) { - log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + - "NetworkNode created new outbound connection:" - + "\nmyNodeAddress=" + getNodeAddress() - + "\npeersNodeAddress=" + peersNodeAddress - + "\nuid=" + outboundConnection.getUid() - + "\nmessage=" + networkEnvelope - + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); + // can take a while when using tor + outboundConnection.sendMessage(networkEnvelope); + return outboundConnection; } - // can take a while when using tor - outboundConnection.sendMessage(networkEnvelope); - return outboundConnection; + } catch (Exception e) { + throw new RuntimeException(e); } + }, connectionExecutor); + + // handle future with timeout + if (timeoutSeconds != null) future.orTimeout(timeoutSeconds, TimeUnit.SECONDS); + future.exceptionally(throwable -> { + log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString()); + UserThread.execute(() -> { + if (!resultFuture.setException(throwable)) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } + }); + return null; }); - - Futures.addCallback(future, new FutureCallback<>() { - public void onSuccess(Connection connection) { - UserThread.execute(() -> resultFuture.set(connection)); - } - - public void onFailure(@NotNull Throwable throwable) { - log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString()); - UserThread.execute(() -> { - if (!resultFuture.setException(throwable)) { - // In case the setException returns false we need to cancel the future. - resultFuture.cancel(true); - } - }); - } - }, MoreExecutors.directExecutor()); + future.thenAccept(resultFuture::set); return resultFuture; }