process mailbox messages in sequential order per trade

This commit is contained in:
woodser 2023-12-04 10:27:32 -05:00
parent da8474a0f4
commit 846a8634e5
7 changed files with 135 additions and 73 deletions

View file

@ -29,8 +29,6 @@ import haveno.core.support.messages.ChatMessage;
import haveno.core.support.messages.SupportMessage; import haveno.core.support.messages.SupportMessage;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.core.trade.TradeManager; import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.TradeProtocol;
import haveno.core.trade.protocol.TradeProtocol.MailboxMessageComparator;
import haveno.core.xmr.wallet.XmrWalletService; import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage; import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType; import haveno.network.p2p.AckMessageSourceType;
@ -40,10 +38,10 @@ import haveno.network.p2p.P2PService;
import haveno.network.p2p.SendMailboxMessageListener; import haveno.network.p2p.SendMailboxMessageListener;
import haveno.network.p2p.mailbox.MailboxMessage; import haveno.network.p2p.mailbox.MailboxMessage;
import haveno.network.p2p.mailbox.MailboxMessageService; import haveno.network.p2p.mailbox.MailboxMessageService;
import haveno.network.p2p.mailbox.MailboxMessageService.DecryptedMessageWithPubKeyComparator;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -82,9 +80,9 @@ public abstract class SupportManager {
// We get first the message handler called then the onBootstrapped // We get first the message handler called then the onBootstrapped
p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> { p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyDirectMessage(decryptedMessageWithPubKey); synchronized (lock) {
else { if (isReady()) applyDirectMessage(decryptedMessageWithPubKey);
synchronized (lock) { else {
// As decryptedDirectMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored // As decryptedDirectMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey); decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages(); tryApplyMessages();
@ -92,9 +90,9 @@ public abstract class SupportManager {
} }
}); });
mailboxMessageService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> { mailboxMessageService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey); synchronized (lock) {
else { if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey);
synchronized (lock) { else {
// As decryptedMailboxMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored // As decryptedMailboxMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey); decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages(); tryApplyMessages();
@ -395,22 +393,4 @@ public abstract class SupportManager {
mailboxMessageService.removeMailboxMsg(ackMessage); mailboxMessageService.removeMailboxMsg(ackMessage);
} }
} }
private static class DecryptedMessageWithPubKeyComparator implements Comparator<DecryptedMessageWithPubKey> {
MailboxMessageComparator mailboxMessageComparator;
public DecryptedMessageWithPubKeyComparator() {
mailboxMessageComparator = new TradeProtocol.MailboxMessageComparator();
}
@Override
public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) {
if (m1.getNetworkEnvelope() instanceof MailboxMessage) {
if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope());
else return 1;
} else {
return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0;
}
}
}
} }

View file

@ -354,6 +354,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
disputeList.add(dispute); disputeList.add(dispute);
} }
// create dispute opened message
NodeAddress agentNodeAddress = getAgentNodeAddress(dispute); NodeAddress agentNodeAddress = getAgentNodeAddress(dispute);
DisputeOpenedMessage disputeOpenedMessage = new DisputeOpenedMessage(dispute, DisputeOpenedMessage disputeOpenedMessage = new DisputeOpenedMessage(dispute,
p2PService.getAddress(), p2PService.getAddress(),
@ -367,6 +368,8 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
disputeOpenedMessage.getTradeId(), disputeOpenedMessage.getUid(), disputeOpenedMessage.getTradeId(), disputeOpenedMessage.getUid(),
chatMessage.getUid()); chatMessage.getUid());
recordPendingMessage(disputeOpenedMessage.getClass().getSimpleName()); recordPendingMessage(disputeOpenedMessage.getClass().getSimpleName());
// send dispute opened message
mailboxMessageService.sendEncryptedMailboxMessage(agentNodeAddress, mailboxMessageService.sendEncryptedMailboxMessage(agentNodeAddress,
dispute.getAgentPubKeyRing(), dispute.getAgentPubKeyRing(),
disputeOpenedMessage, disputeOpenedMessage,
@ -436,7 +439,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// arbitrator receives dispute opened message from opener, opener's peer receives from arbitrator // arbitrator receives dispute opened message from opener, opener's peer receives from arbitrator
protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) { protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
Dispute dispute = message.getDispute(); Dispute dispute = message.getDispute();
log.info("{}.onDisputeOpenedMessage() with trade {}, dispute {}", getClass().getSimpleName(), dispute.getTradeId(), dispute.getId()); log.info("Processing {} with trade {}, dispute {}", message.getClass().getSimpleName(), dispute.getTradeId(), dispute.getId());
// get trade // get trade
Trade trade = tradeManager.getTrade(dispute.getTradeId()); Trade trade = tradeManager.getTrade(dispute.getTradeId());
@ -467,6 +470,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress()); DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress());
//DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList()); //DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList());
} catch (DisputeValidation.ValidationException e) { } catch (DisputeValidation.ValidationException e) {
e.printStackTrace();
validationExceptions.add(e); validationExceptions.add(e);
throw e; throw e;
} }
@ -476,6 +480,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
try { try {
DisputeValidation.validatePaymentAccountPayload(dispute); DisputeValidation.validatePaymentAccountPayload(dispute);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage()); log.warn(e.getMessage());
trade.prependErrorMessage(e.getMessage()); trade.prependErrorMessage(e.getMessage());
} }
@ -499,7 +504,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// update multisig hex // update multisig hex
if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex()); if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex());
trade.importMultisigHex(); if (trade.walletExists()) trade.importMultisigHex();
// add chat message with price info // add chat message with price info
if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0); if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0);
@ -518,8 +523,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
errorMessage = null; errorMessage = null;
} else { } else {
// valid case if both have opened a dispute and agent was not online // valid case if both have opened a dispute and agent was not online
log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId());
dispute.getTradeId());
} }
// add chat message with mediation info if applicable // add chat message with mediation info if applicable
@ -529,6 +533,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
} }
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
errorMessage = e.getMessage(); errorMessage = e.getMessage();
log.warn(errorMessage); log.warn(errorMessage);
if (trade != null) trade.setErrorMessage(errorMessage); if (trade != null) trade.setErrorMessage(errorMessage);

View file

@ -116,7 +116,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
log.info("Received {} from {} with tradeId {} and uid {}", log.info("Received {} from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid()); message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid());
new Thread(() -> { HavenoUtils.runTask(message.getTradeId(), () -> {
if (message instanceof DisputeOpenedMessage) { if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message); handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) { } else if (message instanceof ChatMessage) {
@ -126,7 +126,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
} else { } else {
log.warn("Unsupported message at dispatchMessage. message={}", message); log.warn("Unsupported message at dispatchMessage. message={}", message);
} }
}).start(); });
} }
} }
@ -221,7 +221,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
String summaryText = chatMessage.getMessage(); String summaryText = chatMessage.getMessage();
if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId())); if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId()));
if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring
else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail is arbitrator is unregistered) else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered)
// save dispute closed message for reprocessing // save dispute closed message for reprocessing
trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage); trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage);
@ -253,11 +253,9 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
trade.saveWallet(); trade.saveWallet();
} }
// import multisig hex // update multisig hex
if (trade.walletExists()) { if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex());
if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex()); if (trade.walletExists()) trade.importMultisigHex();
trade.importMultisigHex();
}
// attempt to sign and publish dispute payout tx if given and not already published // attempt to sign and publish dispute payout tx if given and not already published
if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) { if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) {
@ -270,7 +268,9 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
} }
// sign and publish dispute payout tx if peer still has not published // sign and publish dispute payout tx if peer still has not published
if (!trade.isPayoutPublished()) { if (trade.isPayoutPublished()) {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
} else {
try { try {
log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId()); log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
signAndPublishDisputePayoutTx(trade); signAndPublishDisputePayoutTx(trade);
@ -284,14 +284,17 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId); throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId);
} }
} }
} else {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
} }
} else { } else {
if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId()); if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId()); else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId());
} }
// complete disputed trade
if (trade.isPayoutPublished()) {
tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED);
}
// We use the chatMessage as we only persist those not the DisputeClosedMessage. // We use the chatMessage as we only persist those not the DisputeClosedMessage.
// If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage. // If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage.
sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null); sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null);

View file

@ -46,8 +46,10 @@ import java.text.DecimalFormatSymbols;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -75,6 +77,7 @@ public class HavenoUtils {
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
private static final int POOL_SIZE = 10; private static final int POOL_SIZE = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
private static final Map<String, ExecutorService> POOLS = new HashMap<>();
// TODO: better way to share references? // TODO: better way to share references?
public static ArbitrationManager arbitrationManager; public static ArbitrationManager arbitrationManager;
@ -467,6 +470,22 @@ public class HavenoUtils {
} }
} }
public static Future<?> runTask(String threadId, Runnable task) {
synchronized (POOLS) {
if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1));
return POOLS.get(threadId).submit(task);
}
}
public static void removeThreadId(String threadId) {
synchronized (POOLS) {
if (POOLS.containsKey(threadId)) {
POOLS.get(threadId).shutdown();
POOLS.remove(threadId);
}
}
}
/** /**
* Submit tasks to a global thread pool. * Submit tasks to a global thread pool.
*/ */

View file

@ -54,6 +54,7 @@ import haveno.core.trade.messages.InitMultisigRequest;
import haveno.core.trade.messages.InitTradeRequest; import haveno.core.trade.messages.InitTradeRequest;
import haveno.core.trade.messages.SignContractRequest; import haveno.core.trade.messages.SignContractRequest;
import haveno.core.trade.messages.SignContractResponse; import haveno.core.trade.messages.SignContractResponse;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.ArbitratorProtocol; import haveno.core.trade.protocol.ArbitratorProtocol;
import haveno.core.trade.protocol.MakerProtocol; import haveno.core.trade.protocol.MakerProtocol;
import haveno.core.trade.protocol.ProcessModel; import haveno.core.trade.protocol.ProcessModel;
@ -232,7 +233,9 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
@Override @Override
public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
new Thread(() -> { if (!(networkEnvelope instanceof TradeMessage)) return;
String tradeId = ((TradeMessage) networkEnvelope).getTradeId();
HavenoUtils.runTask(tradeId, () -> {
if (networkEnvelope instanceof InitTradeRequest) { if (networkEnvelope instanceof InitTradeRequest) {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
} else if (networkEnvelope instanceof InitMultisigRequest) { } else if (networkEnvelope instanceof InitMultisigRequest) {
@ -246,7 +249,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} else if (networkEnvelope instanceof DepositResponse) { } else if (networkEnvelope instanceof DepositResponse) {
handleDepositResponse((DepositResponse) networkEnvelope, peer); handleDepositResponse((DepositResponse) networkEnvelope, peer);
} }
}).start(); });
} }
@ -1202,6 +1205,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// remove trade // remove trade
tradableList.remove(trade); tradableList.remove(trade);
HavenoUtils.removeThreadId(trade.getId());
// unregister and persist // unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade)); p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));

View file

@ -93,6 +93,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
private int reprocessPaymentReceivedMessageCount; private int reprocessPaymentReceivedMessageCount;
// set comparator for processing mailbox messages
static {
MailboxMessageService.setMailboxMessageComparator(new MailboxMessageComparator());
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -245,12 +250,14 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this); if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this);
// initialize trade // initialize trade
trade.initialize(processModel.getProvider()); synchronized (trade) {
trade.initialize(processModel.getProvider());
// process mailbox messages // process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
}
// send deposits confirmed message if applicable // send deposits confirmed message if applicable
maybeSendDepositsConfirmedMessage(); maybeSendDepositsConfirmedMessage();
@ -477,18 +484,18 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
handleTaskRunnerSuccess(peer, message); handleTaskRunnerSuccess(peer, message);
return; return;
} }
if (trade.getPayoutTx() != null) {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
return;
}
latchTrade(); latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED) expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
.with(message) .with(message)
.from(peer) .from(peer))
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks( .setup(tasks(
ApplyFilter.class, ApplyFilter.class,
ProcessPaymentSentMessage.class, ProcessPaymentSentMessage.class,

View file

@ -123,6 +123,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
private boolean isBootstrapped; private boolean isBootstrapped;
private boolean allServicesInitialized; private boolean allServicesInitialized;
private boolean initAfterBootstrapped; private boolean initAfterBootstrapped;
private static Comparator<MailboxMessage> mailboxMessageComparator;
@Inject @Inject
public MailboxMessageService(NetworkNode networkNode, public MailboxMessageService(NetworkNode networkNode,
@ -182,9 +183,11 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
// Those outdated messages would then stay in the network until TTL triggers removal. // Those outdated messages would then stay in the network until TTL triggers removal.
// By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive. // By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive.
if (serializedSize < MAX_SERIALIZED_SIZE) { if (serializedSize < MAX_SERIALIZED_SIZE) {
mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem); synchronized (mailboxMessageList) {
mailboxMessageList.add(mailboxItem); mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem);
totalSize.getAndAdd(serializedSize); mailboxMessageList.add(mailboxItem);
totalSize.getAndAdd(serializedSize);
}
// We add it to our map so that it get added to the excluded key set we send for // We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at // the initial data requests. So that helps to lower the load for mailbox messages at
@ -330,9 +333,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
*/ */
public void removeMailboxMsg(MailboxMessage mailboxMessage) { public void removeMailboxMsg(MailboxMessage mailboxMessage) {
if (isBootstrapped) { if (isBootstrapped) {
// We need to delay a bit to not get a ConcurrentModificationException as we might iterate over synchronized (mailboxMessageList) {
// mailboxMessageList while getting called.
UserThread.execute(() -> {
String uid = mailboxMessage.getUid(); String uid = mailboxMessage.getUid();
if (!mailboxItemsByUid.containsKey(uid)) { if (!mailboxItemsByUid.containsKey(uid)) {
return; return;
@ -349,7 +350,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
// call removeMailboxItemFromMap here. The onRemoved only removes foreign mailBoxMessages. // call removeMailboxItemFromMap here. The onRemoved only removes foreign mailBoxMessages.
log.trace("## removeMailboxMsg uid={}", uid); log.trace("## removeMailboxMsg uid={}", uid);
removeMailboxItemFromLocalStore(uid); removeMailboxItemFromLocalStore(uid);
}); }
} else { } else {
// In case the network was not ready yet we try again later // In case the network was not ready yet we try again later
UserThread.runAfter(() -> removeMailboxMsg(mailboxMessage), 30); UserThread.runAfter(() -> removeMailboxMsg(mailboxMessage), 30);
@ -397,7 +398,24 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
.forEach(this::removeMailboxItemFromLocalStore); .forEach(this::removeMailboxItemFromLocalStore);
} }
public static void setMailboxMessageComparator(Comparator<MailboxMessage> comparator) {
mailboxMessageComparator = comparator;
}
public static class DecryptedMessageWithPubKeyComparator implements Comparator<DecryptedMessageWithPubKey> {
@Override
public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) {
if (m1.getNetworkEnvelope() instanceof MailboxMessage) {
if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope());
else return 1;
} else {
return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0;
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Private // Private
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -429,7 +447,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) { public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) {
UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> handleMailboxItem(e))); new Thread(() -> handleMailboxItems(decryptedMailboxMessageWithEntries)).start();
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
@ -471,16 +489,42 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
return new MailboxItem(protectedMailboxStorageEntry, null); return new MailboxItem(protectedMailboxStorageEntry, null);
} }
private void handleMailboxItems(Set<MailboxItem> mailboxItems) {
// sort mailbox items
List<MailboxItem> mailboxItemsSorted = mailboxItems.stream()
.filter(e -> !e.isMine())
.collect(Collectors.toList());
mailboxItemsSorted.addAll(mailboxItems.stream()
.filter(e -> e.isMine())
.sorted(new MailboxItemComparator())
.collect(Collectors.toList()));
// handle mailbox items
mailboxItemsSorted.forEach(e -> handleMailboxItem(e));
}
private static class MailboxItemComparator implements Comparator<MailboxItem> {
private DecryptedMessageWithPubKeyComparator comparator = new DecryptedMessageWithPubKeyComparator();
@Override
public int compare(MailboxItem m1, MailboxItem m2) {
return comparator.compare(m1.getDecryptedMessageWithPubKey(), m2.getDecryptedMessageWithPubKey());
}
}
private void handleMailboxItem(MailboxItem mailboxItem) { private void handleMailboxItem(MailboxItem mailboxItem) {
String uid = mailboxItem.getUid(); String uid = mailboxItem.getUid();
if (!mailboxItemsByUid.containsKey(uid)) { synchronized (mailboxMessageList) {
mailboxItemsByUid.put(uid, mailboxItem); if (!mailboxItemsByUid.containsKey(uid)) {
mailboxMessageList.add(mailboxItem); mailboxItemsByUid.put(uid, mailboxItem);
log.trace("## handleMailboxItem uid={}\nhash={}", mailboxMessageList.add(mailboxItem);
uid, log.trace("## handleMailboxItem uid={}\nhash={}",
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload())); uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()));
requestPersistence(); requestPersistence();
}
} }
// In case we had the item already stored we still prefer to apply it again to the domain. // In case we had the item already stored we still prefer to apply it again to the domain.