apply timeout for arbitrator to sign offer and init trade

This commit is contained in:
woodser 2024-03-22 11:41:07 -04:00
parent 10a5b55dfe
commit db12f1c2cb
5 changed files with 117 additions and 99 deletions

View file

@ -28,6 +28,7 @@ import haveno.core.offer.availability.DisputeAgentSelection;
import haveno.core.offer.messages.SignOfferRequest; import haveno.core.offer.messages.SignOfferRequest;
import haveno.core.offer.placeoffer.PlaceOfferModel; import haveno.core.offer.placeoffer.PlaceOfferModel;
import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator; import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator;
import haveno.core.trade.HavenoUtils;
import haveno.core.xmr.model.XmrAddressEntry; import haveno.core.xmr.model.XmrAddressEntry;
import haveno.network.p2p.AckMessage; import haveno.network.p2p.AckMessage;
import haveno.network.p2p.DecryptedDirectMessageListener; import haveno.network.p2p.DecryptedDirectMessageListener;
@ -164,7 +165,8 @@ public class MakerSendSignOfferRequest extends Task<PlaceOfferModel> {
arbitratorNodeAddress, arbitratorNodeAddress,
arbitrator.getPubKeyRing(), arbitrator.getPubKeyRing(),
request, request,
listener listener,
HavenoUtils.ARBITRATOR_ACK_TIMEOUT_SECONDS
); );
} }
} }

View file

@ -66,6 +66,7 @@ public class HavenoUtils {
private static final String RELEASE_DATE = "01-03-2024 00:00:00"; // optionally set to release date of the network in format dd-mm-yyyy to impose temporary limits, etc. e.g. "01-03-2024 00:00:00" private static final String RELEASE_DATE = "01-03-2024 00:00:00"; // optionally set to release date of the network in format dd-mm-yyyy to impose temporary limits, etc. e.g. "01-03-2024 00:00:00"
public static final int RELEASE_LIMIT_DAYS = 60; // number of days to limit sell offers to max buy limit for new accounts public static final int RELEASE_LIMIT_DAYS = 60; // number of days to limit sell offers to max buy limit for new accounts
public static final int WARN_ON_OFFER_EXCEEDS_UNSIGNED_BUY_LIMIT_DAYS = 182; // number of days to warn if sell offer exceeds unsigned buy limit public static final int WARN_ON_OFFER_EXCEEDS_UNSIGNED_BUY_LIMIT_DAYS = 182; // number of days to warn if sell offer exceeds unsigned buy limit
public static final int ARBITRATOR_ACK_TIMEOUT_SECONDS = 15;
// non-configurable // non-configurable
public static final DecimalFormatSymbols DECIMAL_FORMAT_SYMBOLS = DecimalFormatSymbols.getInstance(Locale.US); // use the US locale as a base for all DecimalFormats (commas should be omitted from number strings) public static final DecimalFormatSymbols DECIMAL_FORMAT_SYMBOLS = DecimalFormatSymbols.getInstance(Locale.US); // use the US locale as a base for all DecimalFormats (commas should be omitted from number strings)

View file

@ -23,6 +23,7 @@ import haveno.common.handlers.ResultHandler;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.offer.availability.DisputeAgentSelection; import haveno.core.offer.availability.DisputeAgentSelection;
import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator; import haveno.core.support.dispute.arbitration.arbitrator.Arbitrator;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.core.trade.messages.InitTradeRequest; import haveno.core.trade.messages.InitTradeRequest;
import haveno.network.p2p.NodeAddress; import haveno.network.p2p.NodeAddress;
@ -144,7 +145,8 @@ public class TakerSendInitTradeRequestToArbitrator extends TradeTask {
arbitratorNodeAddress, arbitratorNodeAddress,
arbitrator.getPubKeyRing(), arbitrator.getPubKeyRing(),
arbitratorRequest, arbitratorRequest,
listener listener,
HavenoUtils.ARBITRATOR_ACK_TIMEOUT_SECONDS
); );
} }
} }

View file

@ -382,12 +382,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// DirectMessages // DirectMessages
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// TODO OfferAvailabilityResponse is called twice!
public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message,
SendDirectMessageListener sendDirectMessageListener) { SendDirectMessageListener sendDirectMessageListener) {
sendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener, null);
}
public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message,
SendDirectMessageListener sendDirectMessageListener, Integer timeoutSeconds) {
checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedDirectMessage)"); checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedDirectMessage)");
if (isBootstrapped()) { if (isBootstrapped()) {
doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener); doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener, timeoutSeconds);
} else { } else {
throw new NetworkNotReadyException(); throw new NetworkNotReadyException();
} }
@ -396,7 +400,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress,
PubKeyRing pubKeyRing, PubKeyRing pubKeyRing,
NetworkEnvelope message, NetworkEnvelope message,
SendDirectMessageListener sendDirectMessageListener) { SendDirectMessageListener sendDirectMessageListener,
Integer timeoutSeconds) {
log.debug("Send encrypted direct message {} to peer {}", log.debug("Send encrypted direct message {} to peer {}",
message.getClass().getSimpleName(), peersNodeAddress); message.getClass().getSimpleName(), peersNodeAddress);
@ -417,7 +422,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.getNodeAddress(), networkNode.getNodeAddress(),
encryptionService.encryptAndSign(pubKeyRing, message)); encryptionService.encryptAndSign(pubKeyRing, message));
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedMsg); SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedMsg, timeoutSeconds);
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(@Nullable Connection connection) {

View file

@ -48,6 +48,7 @@ import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -120,6 +121,11 @@ public abstract class NetworkNode implements MessageListener {
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope) { NetworkEnvelope networkEnvelope) {
return sendMessage(peersNodeAddress, networkEnvelope, null);
}
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope, Integer timeoutSeconds) {
log.debug("Send {} to {}. Message details: {}", log.debug("Send {} to {}. Message details: {}",
networkEnvelope.getClass().getSimpleName(), peersNodeAddress, networkEnvelope.getClass().getSimpleName(), peersNodeAddress,
Utilities.toTruncatedString(networkEnvelope)); Utilities.toTruncatedString(networkEnvelope));
@ -136,107 +142,109 @@ public abstract class NetworkNode implements MessageListener {
log.debug("We have not found any connection for peerAddress {}.\n\t" + log.debug("We have not found any connection for peerAddress {}.\n\t" +
"We will create a new outbound connection.", peersNodeAddress); "We will create a new outbound connection.", peersNodeAddress);
SettableFuture<Connection> resultFuture = SettableFuture.create(); SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = connectionExecutor.submit(() -> { CompletableFuture<Connection> future = CompletableFuture.supplyAsync(() -> {
Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" try {
+ Utilities.toTruncatedString(peersNodeAddress.getFullAddress(), 15)); Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-"
if (peersNodeAddress.equals(getNodeAddress())) { + Utilities.toTruncatedString(peersNodeAddress.getFullAddress(), 15));
log.warn("We are sending a message to ourselves"); if (peersNodeAddress.equals(getNodeAddress())) {
} log.warn("We are sending a message to ourselves");
OutboundConnection outboundConnection;
// can take a while when using tor
long startTs = System.currentTimeMillis();
log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress());
Socket socket = createSocket(peersNodeAddress);
long duration = System.currentTimeMillis() - startTs;
log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(),
duration);
if (duration > CREATE_SOCKET_TIMEOUT)
throw new TimeoutException("A timeout occurred when creating a socket.");
// Tor needs sometimes quite long to create a connection. To avoid that we get too many
// connections with the same peer we check again if we still don't have any connection for that node address.
Connection existingConnection = getInboundConnection(peersNodeAddress);
if (existingConnection == null)
existingConnection = getOutboundConnection(peersNodeAddress);
if (existingConnection != null) {
log.debug("We found in the meantime a connection for peersNodeAddress {}, " +
"so we use that for sending the message.\n" +
"That can happen if Tor needs long for creating a new outbound connection.\n" +
"We might have got a new inbound or outbound connection.",
peersNodeAddress.getFullAddress());
try {
socket.close();
} catch (Throwable throwable) {
if (!shutDownInProgress) {
log.error("Error at closing socket " + throwable);
}
} }
existingConnection.sendMessage(networkEnvelope);
return existingConnection; OutboundConnection outboundConnection;
} else { // can take a while when using tor
ConnectionListener connectionListener = new ConnectionListener() { long startTs = System.currentTimeMillis();
@Override
public void onConnection(Connection connection) { log.debug("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress());
if (!connection.isStopped()) {
outBoundConnections.add((OutboundConnection) connection); Socket socket = createSocket(peersNodeAddress);
printOutBoundConnections(); long duration = System.currentTimeMillis() - startTs;
connectionListeners.forEach(e -> e.onConnection(connection)); log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(),
duration);
if (duration > CREATE_SOCKET_TIMEOUT)
throw new TimeoutException("A timeout occurred when creating a socket.");
// Tor needs sometimes quite long to create a connection. To avoid that we get too many
// connections with the same peer we check again if we still don't have any connection for that node address.
Connection existingConnection = getInboundConnection(peersNodeAddress);
if (existingConnection == null)
existingConnection = getOutboundConnection(peersNodeAddress);
if (existingConnection != null) {
log.debug("We found in the meantime a connection for peersNodeAddress {}, " +
"so we use that for sending the message.\n" +
"That can happen if Tor needs long for creating a new outbound connection.\n" +
"We might have got a new inbound or outbound connection.",
peersNodeAddress.getFullAddress());
try {
socket.close();
} catch (Throwable throwable) {
if (!shutDownInProgress) {
log.error("Error at closing socket " + throwable);
} }
} }
existingConnection.sendMessage(networkEnvelope);
return existingConnection;
} else {
ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
if (!connection.isStopped()) {
outBoundConnections.add((OutboundConnection) connection);
printOutBoundConnections();
connectionListeners.forEach(e -> e.onConnection(connection));
}
}
@Override @Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) { Connection connection) {
// noinspection SuspiciousMethodCalls // noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
printOutBoundConnections(); printOutBoundConnections();
connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection)); connectionListeners.forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
};
outboundConnection = new OutboundConnection(socket,
NetworkNode.this,
connectionListener,
peersNodeAddress,
networkProtoResolver,
banFilter);
if (log.isDebugEnabled()) {
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + networkEnvelope
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
} }
}; // can take a while when using tor
outboundConnection = new OutboundConnection(socket, outboundConnection.sendMessage(networkEnvelope);
NetworkNode.this, return outboundConnection;
connectionListener,
peersNodeAddress,
networkProtoResolver,
banFilter);
if (log.isDebugEnabled()) {
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + networkEnvelope
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
} }
// can take a while when using tor } catch (Exception e) {
outboundConnection.sendMessage(networkEnvelope); throw new RuntimeException(e);
return outboundConnection;
} }
}, connectionExecutor);
// handle future with timeout
if (timeoutSeconds != null) future.orTimeout(timeoutSeconds, TimeUnit.SECONDS);
future.exceptionally(throwable -> {
log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString());
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
return null;
}); });
future.thenAccept(resultFuture::set);
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
}
public void onFailure(@NotNull Throwable throwable) {
log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString());
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());
return resultFuture; return resultFuture;
} }