process messages on user thread and protocol tasks off user thread

This commit is contained in:
woodser 2022-09-22 13:21:04 -04:00
parent cab508c7b5
commit c5f5a5af42
11 changed files with 392 additions and 368 deletions

View file

@ -68,7 +68,7 @@ monerod-local1:
--no-zmq \ --no-zmq \
--add-exclusive-node 127.0.0.1:28080 \ --add-exclusive-node 127.0.0.1:28080 \
--rpc-access-control-origins http://localhost:8080 \ --rpc-access-control-origins http://localhost:8080 \
--fixed-difficulty 400 --fixed-difficulty 800
monerod-local2: monerod-local2:
./.localnet/monerod \ ./.localnet/monerod \
@ -82,7 +82,7 @@ monerod-local2:
--confirm-external-bind \ --confirm-external-bind \
--add-exclusive-node 127.0.0.1:48080 \ --add-exclusive-node 127.0.0.1:48080 \
--rpc-access-control-origins http://localhost:8080 \ --rpc-access-control-origins http://localhost:8080 \
--fixed-difficulty 400 --fixed-difficulty 800
funding-wallet-local: funding-wallet-local:
./.localnet/monero-wallet-rpc \ ./.localnet/monero-wallet-rpc \

View file

@ -621,89 +621,93 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void processUnpostedOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler private void processUnpostedOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
List<String> errorMessages = new ArrayList<String>(); new Thread(() -> {
for (OpenOffer scheduledOffer : openOffers.getObservableList()) { List<String> errorMessages = new ArrayList<String>();
if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; for (OpenOffer scheduledOffer : openOffers.getObservableList()) {
CountDownLatch latch = new CountDownLatch(openOffers.list.size()); if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue;
processUnpostedOffer(scheduledOffer, (transaction) -> { CountDownLatch latch = new CountDownLatch(1);
latch.countDown(); processUnpostedOffer(scheduledOffer, (transaction) -> {
}, errorMessage -> { latch.countDown();
latch.countDown(); }, errorMessage -> {
errorMessages.add(errorMessage); latch.countDown();
}); errorMessages.add(errorMessage);
TradeUtils.awaitLatch(latch); });
} TradeUtils.awaitLatch(latch);
requestPersistence(); }
if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); requestPersistence();
else resultHandler.handleResult(null); if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString());
else resultHandler.handleResult(null);
}).start();
} }
private void processUnpostedOffer(OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { private void processUnpostedOffer(OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
try { new Thread(() -> {
try {
// done processing if wallet not initialized // done processing if wallet not initialized
if (xmrWalletService.getWallet() == null) { if (xmrWalletService.getWallet() == null) {
resultHandler.handleResult(null); resultHandler.handleResult(null);
return; return;
}
// get offer reserve amount
Coin offerReserveAmountCoin = openOffer.getOffer().getReserveAmount();
BigInteger offerReserveAmount = ParsingUtils.centinerosToAtomicUnits(offerReserveAmountCoin.value);
// handle sufficient available balance
if (xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0) {
// split outputs if applicable
boolean splitOutput = openOffer.isAutoSplit(); // TODO: determine if output needs split
if (splitOutput) {
throw new Error("Post offer with split output option not yet supported"); // TODO: support scheduling offer with split outputs
} }
// otherwise sign and post offer // get offer reserve amount
else { Coin offerReserveAmountCoin = openOffer.getOffer().getReserveAmount();
signAndPostOffer(openOffer, offerReserveAmountCoin, true, resultHandler, errorMessageHandler); BigInteger offerReserveAmount = ParsingUtils.centinerosToAtomicUnits(offerReserveAmountCoin.value);
}
return;
}
// handle unscheduled offer // handle sufficient available balance
if (openOffer.getScheduledTxHashes() == null) { if (xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0) {
// check for sufficient balance - scheduled offers amount // split outputs if applicable
if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) { boolean splitOutput = openOffer.isAutoSplit(); // TODO: determine if output needs split
throw new RuntimeException("Not enough money in Haveno wallet"); if (splitOutput) {
} throw new Error("Post offer with split output option not yet supported"); // TODO: support scheduling offer with split outputs
// get locked txs
List<MoneroTxWallet> lockedTxs = xmrWalletService.getWallet().getTxs(new MoneroTxQuery().setIsLocked(true));
// get earliest unscheduled txs with sufficient incoming amount
List<String> scheduledTxHashes = new ArrayList<String>();
BigInteger scheduledAmount = new BigInteger("0");
for (MoneroTxWallet lockedTx : lockedTxs) {
if (isTxScheduled(lockedTx.getHash())) continue;
if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue;
scheduledTxHashes.add(lockedTx.getHash());
for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) {
if (transfer.getAccountIndex() == 0) scheduledAmount = scheduledAmount.add(transfer.getAmount());
} }
if (scheduledAmount.compareTo(offerReserveAmount) >= 0) break;
// otherwise sign and post offer
else {
signAndPostOffer(openOffer, offerReserveAmountCoin, true, resultHandler, errorMessageHandler);
}
return;
} }
if (scheduledAmount.compareTo(offerReserveAmount) < 0) throw new RuntimeException("Not enough funds to schedule offer");
// schedule txs // handle unscheduled offer
openOffer.setScheduledTxHashes(scheduledTxHashes); if (openOffer.getScheduledTxHashes() == null) {
openOffer.setScheduledAmount(scheduledAmount.toString());
openOffer.setState(OpenOffer.State.SCHEDULED); // check for sufficient balance - scheduled offers amount
if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) {
throw new RuntimeException("Not enough money in Haveno wallet");
}
// get locked txs
List<MoneroTxWallet> lockedTxs = xmrWalletService.getWallet().getTxs(new MoneroTxQuery().setIsLocked(true));
// get earliest unscheduled txs with sufficient incoming amount
List<String> scheduledTxHashes = new ArrayList<String>();
BigInteger scheduledAmount = new BigInteger("0");
for (MoneroTxWallet lockedTx : lockedTxs) {
if (isTxScheduled(lockedTx.getHash())) continue;
if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue;
scheduledTxHashes.add(lockedTx.getHash());
for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) {
if (transfer.getAccountIndex() == 0) scheduledAmount = scheduledAmount.add(transfer.getAmount());
}
if (scheduledAmount.compareTo(offerReserveAmount) >= 0) break;
}
if (scheduledAmount.compareTo(offerReserveAmount) < 0) throw new RuntimeException("Not enough funds to schedule offer");
// schedule txs
openOffer.setScheduledTxHashes(scheduledTxHashes);
openOffer.setScheduledAmount(scheduledAmount.toString());
openOffer.setState(OpenOffer.State.SCHEDULED);
}
// handle result
resultHandler.handleResult(null);
} catch (Exception e) {
e.printStackTrace();
errorMessageHandler.handleErrorMessage(e.getMessage());
} }
}).start();
// handle result
resultHandler.handleResult(null);
} catch (Exception e) {
e.printStackTrace();
errorMessageHandler.handleErrorMessage(e.getMessage());
}
} }
private BigInteger getScheduledAmount() { private BigInteger getScheduledAmount() {
@ -790,6 +794,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
log.info("Received SignOfferRequest from {} with offerId {} and uid {}", log.info("Received SignOfferRequest from {} with offerId {} and uid {}",
peer, request.getOfferId(), request.getUid()); peer, request.getOfferId(), request.getUid());
boolean result = false;
String errorMessage = null; String errorMessage = null;
try { try {
@ -845,9 +850,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
signedOffers.add(signedOffer); signedOffers.add(signedOffer);
requestPersistence(); requestPersistence();
// send ack
sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), true, errorMessage);
// send response with signature // send response with signature
SignOfferResponse response = new SignOfferResponse(request.getOfferId(), SignOfferResponse response = new SignOfferResponse(request.getOfferId(),
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
@ -874,11 +876,13 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
errorMessage); errorMessage);
} }
}); });
result = true;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
errorMessage = "Exception at handleSignOfferRequest " + e.getMessage(); errorMessage = "Exception at handleSignOfferRequest " + e.getMessage();
log.error(errorMessage); log.error(errorMessage);
sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), false, errorMessage); } finally {
sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), result, errorMessage);
} }
} }

View file

@ -93,6 +93,10 @@ public class MakerSendSignOfferRequest extends Task<PlaceOfferModel> {
private void sendSignOfferRequests(SignOfferRequest request, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { private void sendSignOfferRequests(SignOfferRequest request, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
Arbitrator leastUsedArbitrator = DisputeAgentSelection.getLeastUsedArbitrator(model.getTradeStatisticsManager(), model.getArbitratorManager()); Arbitrator leastUsedArbitrator = DisputeAgentSelection.getLeastUsedArbitrator(model.getTradeStatisticsManager(), model.getArbitratorManager());
if (leastUsedArbitrator == null) {
errorMessageHandler.handleErrorMessage("Could not get least used arbitrator");
return;
}
sendSignOfferRequests(request, leastUsedArbitrator.getNodeAddress(), new HashSet<NodeAddress>(), resultHandler, errorMessageHandler); sendSignOfferRequests(request, leastUsedArbitrator.getNodeAddress(), new HashSet<NodeAddress>(), resultHandler, errorMessageHandler);
} }
@ -140,7 +144,7 @@ public class MakerSendSignOfferRequest extends Task<PlaceOfferModel> {
// get registered arbitrator // get registered arbitrator
Arbitrator arbitrator = model.getUser().getAcceptedArbitratorByAddress(arbitratorNodeAddress); Arbitrator arbitrator = model.getUser().getAcceptedArbitratorByAddress(arbitratorNodeAddress);
if (arbitrator == null) throw new RuntimeException("Node address " + arbitratorNodeAddress + " is not a registered arbitrator"); if (arbitrator == null) throw new RuntimeException("Node address " + arbitratorNodeAddress + " is not a registered arbitrator"); // TODO: use error handler
request.getOfferPayload().setArbitratorSigner(arbitratorNodeAddress); request.getOfferPayload().setArbitratorSigner(arbitratorNodeAddress);
// send request to arbitrator // send request to arbitrator

View file

@ -51,30 +51,32 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
System.out.println("ArbitratorProtocol.handleInitTradeRequest()"); System.out.println("ArbitratorProtocol.handleInitTradeRequest()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
this.errorMessageHandler = errorMessageHandler; latchTrade();
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT) processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
.with(message) expect(phase(Trade.Phase.INIT)
.from(peer)) .with(message)
.setup(tasks( .from(peer))
ApplyFilter.class, .setup(tasks(
ProcessInitTradeRequest.class, ApplyFilter.class,
ArbitratorProcessReserveTx.class, ProcessInitTradeRequest.class,
ArbitratorSendInitTradeOrMultisigRequests.class) ArbitratorProcessReserveTx.class,
.using(new TradeTaskRunner(trade, ArbitratorSendInitTradeOrMultisigRequests.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
handleTaskRunnerSuccess(peer, message); startTimeout(TRADE_TIMEOUT);
}, handleTaskRunnerSuccess(peer, message);
errorMessage -> { },
handleTaskRunnerFault(peer, message, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(peer, message, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override
@ -84,30 +86,32 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handleDepositRequest(DepositRequest request, NodeAddress sender) { public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId()); System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId());
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request); latchTrade();
processModel.setTradeMessage(request); Validator.checkTradeId(processModel.getOfferId(), request);
expect(phase(Trade.Phase.INIT) processModel.setTradeMessage(request);
.with(request) expect(phase(Trade.Phase.INIT)
.from(sender)) .with(request)
.setup(tasks( .from(sender))
ArbitratorProcessDepositRequest.class) .setup(tasks(
.using(new TradeTaskRunner(trade, ArbitratorProcessDepositRequest.class)
() -> { .using(new TradeTaskRunner(trade,
if (trade.getState() == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) { () -> {
stopTimeout(); if (trade.getState() == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) {
this.errorMessageHandler = null; stopTimeout();
} this.errorMessageHandler = null;
handleTaskRunnerSuccess(sender, request); }
}, handleTaskRunnerSuccess(sender, request);
errorMessage -> { },
handleTaskRunnerFault(sender, request, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(sender, request, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override
@ -117,27 +121,29 @@ public class ArbitratorProtocol extends DisputeProtocol {
public void handlePaymentAccountKeyRequest(PaymentAccountKeyRequest request, NodeAddress sender) { public void handlePaymentAccountKeyRequest(PaymentAccountKeyRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handlePaymentAccountKeyRequest() " + trade.getId()); System.out.println("ArbitratorProtocol.handlePaymentAccountKeyRequest() " + trade.getId());
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request); latchTrade();
processModel.setTradeMessage(request); Validator.checkTradeId(processModel.getOfferId(), request);
expect(new Condition(trade) processModel.setTradeMessage(request);
.with(request) expect(new Condition(trade)
.from(sender)) .with(request)
.setup(tasks( .from(sender))
ArbitratorProcessPaymentAccountKeyRequest.class) .setup(tasks(
.using(new TradeTaskRunner(trade, ArbitratorProcessPaymentAccountKeyRequest.class)
() -> { .using(new TradeTaskRunner(trade,
stopTimeout(); () -> {
handleTaskRunnerSuccess(sender, request); stopTimeout();
}, handleTaskRunnerSuccess(sender, request);
errorMessage -> { },
handleTaskRunnerFault(sender, request, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(sender, request, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
protected void handle(PayoutTxPublishedMessage request, NodeAddress peer) { protected void handle(PayoutTxPublishedMessage request, NodeAddress peer) {

View file

@ -54,29 +54,31 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
NodeAddress peer, NodeAddress peer,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
this.errorMessageHandler = errorMessageHandler; latchTrade();
expect(phase(Trade.Phase.INIT) this.errorMessageHandler = errorMessageHandler;
.with(message) expect(phase(Trade.Phase.INIT)
.from(peer)) .with(message)
.setup(tasks( .from(peer))
ProcessInitTradeRequest.class, .setup(tasks(
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here ProcessInitTradeRequest.class,
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
MakerSendInitTradeRequest.class) //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
.using(new TradeTaskRunner(trade, MakerSendInitTradeRequest.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
handleTaskRunnerSuccess(peer, message); startTimeout(TRADE_TIMEOUT);
}, handleTaskRunnerSuccess(peer, message);
errorMessage -> { },
handleTaskRunnerFault(peer, message, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(peer, message, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override

View file

@ -65,29 +65,31 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void onTakeOffer(TradeResultHandler tradeResultHandler, public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()"); System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
this.tradeResultHandler = tradeResultHandler; latchTrade();
this.errorMessageHandler = errorMessageHandler; this.tradeResultHandler = tradeResultHandler;
expect(phase(Trade.Phase.INIT) this.errorMessageHandler = errorMessageHandler;
.with(TakerEvent.TAKE_OFFER) expect(phase(Trade.Phase.INIT)
.from(trade.getTradingPeerNodeAddress())) .with(TakerEvent.TAKE_OFFER)
.setup(tasks( .from(trade.getTradingPeerNodeAddress()))
ApplyFilter.class, .setup(tasks(
TakerReserveTradeFunds.class, ApplyFilter.class,
TakerSendInitTradeRequestToArbitrator.class) TakerReserveTradeFunds.class,
.using(new TradeTaskRunner(trade, TakerSendInitTradeRequestToArbitrator.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
unlatchTrade(); startTimeout(TRADE_TIMEOUT);
}, unlatchTrade();
errorMessage -> { },
handleError(errorMessage); errorMessage -> {
})) handleError(errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override

View file

@ -55,29 +55,31 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
NodeAddress peer, NodeAddress peer,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
this.errorMessageHandler = errorMessageHandler; latchTrade();
expect(phase(Trade.Phase.INIT) this.errorMessageHandler = errorMessageHandler;
.with(message) expect(phase(Trade.Phase.INIT)
.from(peer)) .with(message)
.setup(tasks( .from(peer))
ProcessInitTradeRequest.class, .setup(tasks(
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here ProcessInitTradeRequest.class,
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
MakerSendInitTradeRequest.class) //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
.using(new TradeTaskRunner(trade, MakerSendInitTradeRequest.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
handleTaskRunnerSuccess(peer, message); startTimeout(TRADE_TIMEOUT);
}, handleTaskRunnerSuccess(peer, message);
errorMessage -> { },
handleTaskRunnerFault(peer, message, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(peer, message, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override

View file

@ -63,29 +63,31 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
public void onTakeOffer(TradeResultHandler tradeResultHandler, public void onTakeOffer(TradeResultHandler tradeResultHandler,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
System.out.println(getClass().getCanonicalName() + ".onTakeOffer()"); System.out.println(getClass().getCanonicalName() + ".onTakeOffer()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
this.tradeResultHandler = tradeResultHandler; latchTrade();
this.errorMessageHandler = errorMessageHandler; this.tradeResultHandler = tradeResultHandler;
expect(phase(Trade.Phase.INIT) this.errorMessageHandler = errorMessageHandler;
.with(TakerEvent.TAKE_OFFER) expect(phase(Trade.Phase.INIT)
.from(trade.getTradingPeerNodeAddress())) .with(TakerEvent.TAKE_OFFER)
.setup(tasks( .from(trade.getTradingPeerNodeAddress()))
ApplyFilter.class, .setup(tasks(
TakerReserveTradeFunds.class, ApplyFilter.class,
TakerSendInitTradeRequestToArbitrator.class) TakerReserveTradeFunds.class,
.using(new TradeTaskRunner(trade, TakerSendInitTradeRequestToArbitrator.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
unlatchTrade(); startTimeout(TRADE_TIMEOUT);
}, unlatchTrade();
errorMessage -> { },
handleError(errorMessage); errorMessage -> {
})) handleError(errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
@Override @Override

View file

@ -229,125 +229,133 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()"); System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), request); latchTrade();
processModel.setTradeMessage(request); Validator.checkTradeId(processModel.getOfferId(), request);
expect(anyPhase(Trade.Phase.INIT) processModel.setTradeMessage(request);
.with(request) expect(anyPhase(Trade.Phase.INIT)
.from(sender)) .with(request)
.setup(tasks( .from(sender))
ProcessInitMultisigRequest.class, .setup(tasks(
MaybeSendSignContractRequest.class) ProcessInitMultisigRequest.class,
.using(new TradeTaskRunner(trade, MaybeSendSignContractRequest.class)
() -> { .using(new TradeTaskRunner(trade,
startTimeout(TRADE_TIMEOUT); () -> {
handleTaskRunnerSuccess(sender, request); startTimeout(TRADE_TIMEOUT);
}, handleTaskRunnerSuccess(sender, request);
errorMessage -> { },
handleTaskRunnerFault(sender, request, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(sender, request, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest() " + trade.getId()); System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest() " + trade.getId());
synchronized (trade) { new Thread(() -> {
Validator.checkTradeId(processModel.getOfferId(), message); synchronized (trade) {
if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message); Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) latchTrade();
.with(message) Validator.checkTradeId(processModel.getOfferId(), message);
.from(sender)) processModel.setTradeMessage(message);
.setup(tasks( expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED)
// TODO (woodser): validate request .with(message)
ProcessSignContractRequest.class) .from(sender))
.using(new TradeTaskRunner(trade, .setup(tasks(
() -> { // TODO (woodser): validate request
startTimeout(TRADE_TIMEOUT); ProcessSignContractRequest.class)
handleTaskRunnerSuccess(sender, message); .using(new TradeTaskRunner(trade,
}, () -> {
errorMessage -> { startTimeout(TRADE_TIMEOUT);
handleTaskRunnerFault(sender, message, errorMessage); handleTaskRunnerSuccess(sender, message);
})) },
.withTimeout(TRADE_TIMEOUT)) // extend timeout errorMessage -> {
.executeTasks(true); handleTaskRunnerFault(sender, message, errorMessage);
awaitTradeLatch(); }))
} else { .withTimeout(TRADE_TIMEOUT)) // extend timeout
// process sign contract request after multisig created .executeTasks(true);
EasyBind.subscribe(trade.stateProperty(), state -> { awaitTradeLatch();
if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock } else {
}); // process sign contract request after multisig created
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock
});
}
} }
} }).start();
} }
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId()); System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId());
synchronized (trade) { new Thread(() -> {
Validator.checkTradeId(processModel.getOfferId(), message); synchronized (trade) {
if (trade.getState() == Trade.State.CONTRACT_SIGNED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message); Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); if (trade.getState() == Trade.State.CONTRACT_SIGNED) {
expect(state(Trade.State.CONTRACT_SIGNED) latchTrade();
.with(message) Validator.checkTradeId(processModel.getOfferId(), message);
.from(sender)) processModel.setTradeMessage(message);
.setup(tasks( expect(state(Trade.State.CONTRACT_SIGNED)
// TODO (woodser): validate request .with(message)
ProcessSignContractResponse.class, .from(sender))
MakerRemoveOpenOffer.class) .setup(tasks(
.using(new TradeTaskRunner(trade, // TODO (woodser): validate request
() -> { ProcessSignContractResponse.class,
startTimeout(TRADE_TIMEOUT); MakerRemoveOpenOffer.class)
handleTaskRunnerSuccess(sender, message); .using(new TradeTaskRunner(trade,
}, () -> {
errorMessage -> { startTimeout(TRADE_TIMEOUT);
handleTaskRunnerFault(sender, message, errorMessage); handleTaskRunnerSuccess(sender, message);
})) },
.withTimeout(TRADE_TIMEOUT)) // extend timeout errorMessage -> {
.executeTasks(true); handleTaskRunnerFault(sender, message, errorMessage);
awaitTradeLatch(); }))
} else { .withTimeout(TRADE_TIMEOUT)) // extend timeout
// process sign contract response after contract signed .executeTasks(true);
EasyBind.subscribe(trade.stateProperty(), state -> { awaitTradeLatch();
if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock } else {
}); // process sign contract response after contract signed
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
} }
} }).start();
} }
public void handleDepositResponse(DepositResponse response, NodeAddress sender) { public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()"); System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()");
synchronized (trade) { new Thread(() -> {
latchTrade(); synchronized (trade) {
Validator.checkTradeId(processModel.getOfferId(), response); latchTrade();
processModel.setTradeMessage(response); Validator.checkTradeId(processModel.getOfferId(), response);
expect(state(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) processModel.setTradeMessage(response);
.with(response) expect(state(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() .with(response)
.setup(tasks( .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
// TODO (woodser): validate request .setup(tasks(
ProcessDepositResponse.class) // TODO (woodser): validate request
.using(new TradeTaskRunner(trade, ProcessDepositResponse.class)
() -> { .using(new TradeTaskRunner(trade,
stopTimeout(); () -> {
this.errorMessageHandler = null; stopTimeout();
handleTaskRunnerSuccess(sender, response); this.errorMessageHandler = null;
if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized handleTaskRunnerSuccess(sender, response);
}, if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized
errorMessage -> { },
handleTaskRunnerFault(sender, response, errorMessage); errorMessage -> {
})) handleTaskRunnerFault(sender, response, errorMessage);
.withTimeout(TRADE_TIMEOUT)) }))
.executeTasks(true); .withTimeout(TRADE_TIMEOUT))
awaitTradeLatch(); .executeTasks(true);
} awaitTradeLatch();
}
}).start();
} }
// TODO (woodser): update to use fluent for consistency // TODO (woodser): update to use fluent for consistency

View file

@ -42,6 +42,7 @@ import bisq.core.offer.Offer;
import bisq.core.offer.OfferDirection; import bisq.core.offer.OfferDirection;
import bisq.core.offer.OpenOfferManager; import bisq.core.offer.OpenOfferManager;
import bisq.core.provider.price.PriceFeedService; import bisq.core.provider.price.PriceFeedService;
import bisq.core.trade.TradeUtils;
import bisq.core.user.Preferences; import bisq.core.user.Preferences;
import bisq.core.util.VolumeUtil; import bisq.core.util.VolumeUtil;
@ -66,6 +67,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors; import java.util.stream.Collectors;
class OfferBookChartViewModel extends ActivatableViewModel { class OfferBookChartViewModel extends ActivatableViewModel {
@ -125,19 +127,17 @@ class OfferBookChartViewModel extends ActivatableViewModel {
offerBookListItems = offerBook.getOfferBookListItems(); offerBookListItems = offerBook.getOfferBookListItems();
offerBookListItemsListener = c -> { offerBookListItemsListener = c -> {
UserThread.execute(() -> { c.next();
c.next(); if (c.wasAdded() || c.wasRemoved()) {
if (c.wasAdded() || c.wasRemoved()) { ArrayList<OfferBookListItem> list = new ArrayList<>(c.getRemoved());
ArrayList<OfferBookListItem> list = new ArrayList<>(c.getRemoved()); list.addAll(c.getAddedSubList());
list.addAll(c.getAddedSubList()); if (list.stream()
if (list.stream() .map(OfferBookListItem::getOffer)
.map(OfferBookListItem::getOffer) .anyMatch(e -> e.getCurrencyCode().equals(selectedTradeCurrencyProperty.get().getCode())))
.anyMatch(e -> e.getCurrencyCode().equals(selectedTradeCurrencyProperty.get().getCode()))) updateChartData();
updateChartData(); }
}
fillTradeCurrencies(); fillTradeCurrencies();
});
}; };
currenciesUpdatedListener = (observable, oldValue, newValue) -> { currenciesUpdatedListener = (observable, oldValue, newValue) -> {

View file

@ -432,18 +432,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// Only receive non - CloseConnectionMessage network_messages // Only receive non - CloseConnectionMessage network_messages
@Override @Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
Connection that = this; checkArgument(connection.equals(this));
connectionThreadPool.submit(new Runnable() { if (networkEnvelope instanceof BundleOfEnvelopes) {
@Override onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection);
public void run() { } else {
checkArgument(connection.equals(that)); UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
if (networkEnvelope instanceof BundleOfEnvelopes) { }
onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection);
} else {
messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection));
}
}
});
} }
private void onBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes, Connection connection) { private void onBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes, Connection connection) {
@ -475,8 +469,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
envelopesToProcess.add(networkEnvelope); envelopesToProcess.add(networkEnvelope);
} }
} }
envelopesToProcess.forEach(envelope -> envelopesToProcess.forEach(envelope -> UserThread.execute(() ->
messageListeners.forEach(listener -> listener.onMessage(envelope, connection))); messageListeners.forEach(listener -> listener.onMessage(envelope, connection))));
} }