diff --git a/core/src/main/java/haveno/core/support/SupportManager.java b/core/src/main/java/haveno/core/support/SupportManager.java index 5c2e8b6f..ecbf5302 100644 --- a/core/src/main/java/haveno/core/support/SupportManager.java +++ b/core/src/main/java/haveno/core/support/SupportManager.java @@ -29,8 +29,6 @@ import haveno.core.support.messages.ChatMessage; import haveno.core.support.messages.SupportMessage; import haveno.core.trade.Trade; import haveno.core.trade.TradeManager; -import haveno.core.trade.protocol.TradeProtocol; -import haveno.core.trade.protocol.TradeProtocol.MailboxMessageComparator; import haveno.core.xmr.wallet.XmrWalletService; import haveno.network.p2p.AckMessage; import haveno.network.p2p.AckMessageSourceType; @@ -40,10 +38,10 @@ import haveno.network.p2p.P2PService; import haveno.network.p2p.SendMailboxMessageListener; import haveno.network.p2p.mailbox.MailboxMessage; import haveno.network.p2p.mailbox.MailboxMessageService; +import haveno.network.p2p.mailbox.MailboxMessageService.DecryptedMessageWithPubKeyComparator; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,9 +80,9 @@ public abstract class SupportManager { // We get first the message handler called then the onBootstrapped p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> { - if (isReady()) applyDirectMessage(decryptedMessageWithPubKey); - else { - synchronized (lock) { + synchronized (lock) { + if (isReady()) applyDirectMessage(decryptedMessageWithPubKey); + else { // As decryptedDirectMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey); tryApplyMessages(); @@ -92,9 +90,9 @@ public abstract class SupportManager { } }); mailboxMessageService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> { - if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey); - else { - synchronized (lock) { + synchronized (lock) { + if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey); + else { // As decryptedMailboxMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey); tryApplyMessages(); @@ -395,22 +393,4 @@ public abstract class SupportManager { mailboxMessageService.removeMailboxMsg(ackMessage); } } - - private static class DecryptedMessageWithPubKeyComparator implements Comparator { - - MailboxMessageComparator mailboxMessageComparator; - public DecryptedMessageWithPubKeyComparator() { - mailboxMessageComparator = new TradeProtocol.MailboxMessageComparator(); - } - - @Override - public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) { - if (m1.getNetworkEnvelope() instanceof MailboxMessage) { - if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope()); - else return 1; - } else { - return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0; - } - } - } } diff --git a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java index faed3a89..6a1f69c9 100644 --- a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java +++ b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java @@ -354,6 +354,7 @@ public abstract class DisputeManager> extends Sup disputeList.add(dispute); } + // create dispute opened message NodeAddress agentNodeAddress = getAgentNodeAddress(dispute); DisputeOpenedMessage disputeOpenedMessage = new DisputeOpenedMessage(dispute, p2PService.getAddress(), @@ -367,6 +368,8 @@ public abstract class DisputeManager> extends Sup disputeOpenedMessage.getTradeId(), disputeOpenedMessage.getUid(), chatMessage.getUid()); recordPendingMessage(disputeOpenedMessage.getClass().getSimpleName()); + + // send dispute opened message mailboxMessageService.sendEncryptedMailboxMessage(agentNodeAddress, dispute.getAgentPubKeyRing(), disputeOpenedMessage, @@ -436,7 +439,7 @@ public abstract class DisputeManager> extends Sup // arbitrator receives dispute opened message from opener, opener's peer receives from arbitrator protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) { Dispute dispute = message.getDispute(); - log.info("{}.onDisputeOpenedMessage() with trade {}, dispute {}", getClass().getSimpleName(), dispute.getTradeId(), dispute.getId()); + log.info("Processing {} with trade {}, dispute {}", message.getClass().getSimpleName(), dispute.getTradeId(), dispute.getId()); // get trade Trade trade = tradeManager.getTrade(dispute.getTradeId()); @@ -467,6 +470,7 @@ public abstract class DisputeManager> extends Sup DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress()); //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList()); } catch (DisputeValidation.ValidationException e) { + e.printStackTrace(); validationExceptions.add(e); throw e; } @@ -476,6 +480,7 @@ public abstract class DisputeManager> extends Sup try { DisputeValidation.validatePaymentAccountPayload(dispute); } catch (Exception e) { + e.printStackTrace(); log.warn(e.getMessage()); trade.prependErrorMessage(e.getMessage()); } @@ -499,7 +504,7 @@ public abstract class DisputeManager> extends Sup // update multisig hex if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex()); - trade.importMultisigHex(); + if (trade.walletExists()) trade.importMultisigHex(); // add chat message with price info if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0); @@ -518,8 +523,7 @@ public abstract class DisputeManager> extends Sup errorMessage = null; } else { // valid case if both have opened a dispute and agent was not online - log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", - dispute.getTradeId()); + log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId()); } // add chat message with mediation info if applicable @@ -529,6 +533,7 @@ public abstract class DisputeManager> extends Sup } } } catch (Exception e) { + e.printStackTrace(); errorMessage = e.getMessage(); log.warn(errorMessage); if (trade != null) trade.setErrorMessage(errorMessage); diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index aadea08f..57908663 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -116,7 +116,7 @@ public final class ArbitrationManager extends DisputeManager { + HavenoUtils.runTask(message.getTradeId(), () -> { if (message instanceof DisputeOpenedMessage) { handleDisputeOpenedMessage((DisputeOpenedMessage) message); } else if (message instanceof ChatMessage) { @@ -126,7 +126,7 @@ public final class ArbitrationManager extends DisputeManager POOLS = new HashMap<>(); // TODO: better way to share references? public static ArbitrationManager arbitrationManager; @@ -467,6 +470,22 @@ public class HavenoUtils { } } + public static Future runTask(String threadId, Runnable task) { + synchronized (POOLS) { + if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1)); + return POOLS.get(threadId).submit(task); + } + } + + public static void removeThreadId(String threadId) { + synchronized (POOLS) { + if (POOLS.containsKey(threadId)) { + POOLS.get(threadId).shutdown(); + POOLS.remove(threadId); + } + } + } + /** * Submit tasks to a global thread pool. */ diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index a10389c0..b4f55dc0 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -54,6 +54,7 @@ import haveno.core.trade.messages.InitMultisigRequest; import haveno.core.trade.messages.InitTradeRequest; import haveno.core.trade.messages.SignContractRequest; import haveno.core.trade.messages.SignContractResponse; +import haveno.core.trade.messages.TradeMessage; import haveno.core.trade.protocol.ArbitratorProtocol; import haveno.core.trade.protocol.MakerProtocol; import haveno.core.trade.protocol.ProcessModel; @@ -232,7 +233,9 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi @Override public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); - new Thread(() -> { + if (!(networkEnvelope instanceof TradeMessage)) return; + String tradeId = ((TradeMessage) networkEnvelope).getTradeId(); + HavenoUtils.runTask(tradeId, () -> { if (networkEnvelope instanceof InitTradeRequest) { handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof InitMultisigRequest) { @@ -246,7 +249,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } else if (networkEnvelope instanceof DepositResponse) { handleDepositResponse((DepositResponse) networkEnvelope, peer); } - }).start(); + }); } @@ -1202,6 +1205,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi // remove trade tradableList.remove(trade); + HavenoUtils.removeThreadId(trade.getId()); // unregister and persist p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade)); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 53db1196..2d075430 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -93,6 +93,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D private int reprocessPaymentReceivedMessageCount; + // set comparator for processing mailbox messages + static { + MailboxMessageService.setMailboxMessageComparator(new MailboxMessageComparator()); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// @@ -245,12 +250,14 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this); // initialize trade - trade.initialize(processModel.getProvider()); + synchronized (trade) { + trade.initialize(processModel.getProvider()); - // process mailbox messages - MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); - if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); - handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); + // process mailbox messages + MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); + if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); + handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); + } // send deposits confirmed message if applicable maybeSendDepositsConfirmedMessage(); @@ -477,18 +484,18 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D handleTaskRunnerSuccess(peer, message); return; } + if (trade.getPayoutTx() != null) { + log.warn("We received a PaymentSentMessage but we have already created the payout tx " + + "so we ignore the message. This can happen if the ACK message to the peer did not " + + "arrive and the peer repeats sending us the message. We send another ACK msg."); + sendAckMessage(peer, message, true, null); + removeMailboxMessageAfterProcessing(message); + return; + } latchTrade(); expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) .with(message) - .from(peer) - .preCondition(trade.getPayoutTx() == null, - () -> { - log.warn("We received a PaymentSentMessage but we have already created the payout tx " + - "so we ignore the message. This can happen if the ACK message to the peer did not " + - "arrive and the peer repeats sending us the message. We send another ACK msg."); - sendAckMessage(peer, message, true, null); - removeMailboxMessageAfterProcessing(message); - })) + .from(peer)) .setup(tasks( ApplyFilter.class, ProcessPaymentSentMessage.class, diff --git a/p2p/src/main/java/haveno/network/p2p/mailbox/MailboxMessageService.java b/p2p/src/main/java/haveno/network/p2p/mailbox/MailboxMessageService.java index 5d2ce5b7..09780edc 100644 --- a/p2p/src/main/java/haveno/network/p2p/mailbox/MailboxMessageService.java +++ b/p2p/src/main/java/haveno/network/p2p/mailbox/MailboxMessageService.java @@ -123,6 +123,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD private boolean isBootstrapped; private boolean allServicesInitialized; private boolean initAfterBootstrapped; + private static Comparator mailboxMessageComparator; @Inject public MailboxMessageService(NetworkNode networkNode, @@ -182,9 +183,11 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD // Those outdated messages would then stay in the network until TTL triggers removal. // By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive. if (serializedSize < MAX_SERIALIZED_SIZE) { - mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem); - mailboxMessageList.add(mailboxItem); - totalSize.getAndAdd(serializedSize); + synchronized (mailboxMessageList) { + mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem); + mailboxMessageList.add(mailboxItem); + totalSize.getAndAdd(serializedSize); + } // We add it to our map so that it get added to the excluded key set we send for // the initial data requests. So that helps to lower the load for mailbox messages at @@ -330,9 +333,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD */ public void removeMailboxMsg(MailboxMessage mailboxMessage) { if (isBootstrapped) { - // We need to delay a bit to not get a ConcurrentModificationException as we might iterate over - // mailboxMessageList while getting called. - UserThread.execute(() -> { + synchronized (mailboxMessageList) { String uid = mailboxMessage.getUid(); if (!mailboxItemsByUid.containsKey(uid)) { return; @@ -349,7 +350,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD // call removeMailboxItemFromMap here. The onRemoved only removes foreign mailBoxMessages. log.trace("## removeMailboxMsg uid={}", uid); removeMailboxItemFromLocalStore(uid); - }); + } } else { // In case the network was not ready yet we try again later UserThread.runAfter(() -> removeMailboxMsg(mailboxMessage), 30); @@ -397,7 +398,24 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD .forEach(this::removeMailboxItemFromLocalStore); } + public static void setMailboxMessageComparator(Comparator comparator) { + mailboxMessageComparator = comparator; + } + public static class DecryptedMessageWithPubKeyComparator implements Comparator { + + @Override + public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) { + if (m1.getNetworkEnvelope() instanceof MailboxMessage) { + if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope()); + else return 1; + } else { + return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0; + } + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// @@ -429,7 +447,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD Futures.addCallback(future, new FutureCallback<>() { public void onSuccess(Set decryptedMailboxMessageWithEntries) { - UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> handleMailboxItem(e))); + new Thread(() -> handleMailboxItems(decryptedMailboxMessageWithEntries)).start(); } public void onFailure(@NotNull Throwable throwable) { @@ -471,16 +489,42 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD return new MailboxItem(protectedMailboxStorageEntry, null); } + private void handleMailboxItems(Set mailboxItems) { + + // sort mailbox items + List mailboxItemsSorted = mailboxItems.stream() + .filter(e -> !e.isMine()) + .collect(Collectors.toList()); + mailboxItemsSorted.addAll(mailboxItems.stream() + .filter(e -> e.isMine()) + .sorted(new MailboxItemComparator()) + .collect(Collectors.toList())); + + // handle mailbox items + mailboxItemsSorted.forEach(e -> handleMailboxItem(e)); + } + + private static class MailboxItemComparator implements Comparator { + private DecryptedMessageWithPubKeyComparator comparator = new DecryptedMessageWithPubKeyComparator(); + + @Override + public int compare(MailboxItem m1, MailboxItem m2) { + return comparator.compare(m1.getDecryptedMessageWithPubKey(), m2.getDecryptedMessageWithPubKey()); + } + } + private void handleMailboxItem(MailboxItem mailboxItem) { String uid = mailboxItem.getUid(); - if (!mailboxItemsByUid.containsKey(uid)) { - mailboxItemsByUid.put(uid, mailboxItem); - mailboxMessageList.add(mailboxItem); - log.trace("## handleMailboxItem uid={}\nhash={}", - uid, - P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload())); + synchronized (mailboxMessageList) { + if (!mailboxItemsByUid.containsKey(uid)) { + mailboxItemsByUid.put(uid, mailboxItem); + mailboxMessageList.add(mailboxItem); + log.trace("## handleMailboxItem uid={}\nhash={}", + uid, + P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload())); - requestPersistence(); + requestPersistence(); + } } // In case we had the item already stored we still prefer to apply it again to the domain.