From 800b309a4bbb0264bfd427f452af69f316903a8d Mon Sep 17 00:00:00 2001 From: Fritz Lumnitz Date: Sun, 9 Jan 2022 17:02:44 +0100 Subject: [PATCH] Add notifications listener --- core/src/main/java/bisq/core/api/CoreApi.java | 23 ++++- .../core/api/CoreNotificationService.java | 42 ++++++++ .../java/bisq/core/trade/TradeManager.java | 20 +++- .../daemon/grpc/GrpcNotificationsService.java | 96 +++++++++++++++++++ .../java/bisq/daemon/grpc/GrpcServer.java | 4 +- proto/src/main/proto/grpc.proto | 27 ++++++ 6 files changed, 209 insertions(+), 3 deletions(-) create mode 100644 core/src/main/java/bisq/core/api/CoreNotificationService.java create mode 100644 daemon/src/main/java/bisq/daemon/grpc/GrpcNotificationsService.java diff --git a/core/src/main/java/bisq/core/api/CoreApi.java b/core/src/main/java/bisq/core/api/CoreApi.java index 57678475..944046a3 100644 --- a/core/src/main/java/bisq/core/api/CoreApi.java +++ b/core/src/main/java/bisq/core/api/CoreApi.java @@ -36,6 +36,8 @@ import bisq.common.config.Config; import bisq.common.handlers.ErrorMessageHandler; import bisq.common.handlers.ResultHandler; +import bisq.proto.grpc.NotificationMessage; + import org.bitcoinj.core.Coin; import org.bitcoinj.core.Transaction; @@ -52,6 +54,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import lombok.Getter; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -77,6 +80,7 @@ public class CoreApi { private final CoreTradesService coreTradesService; private final CoreWalletsService walletsService; private final TradeStatisticsManager tradeStatisticsManager; + private final CoreNotificationService notificationService; @Inject public CoreApi(Config config, @@ -87,7 +91,8 @@ public class CoreApi { CorePriceService corePriceService, CoreTradesService coreTradesService, CoreWalletsService walletsService, - TradeStatisticsManager tradeStatisticsManager) { + TradeStatisticsManager tradeStatisticsManager, + CoreNotificationService notificationService) { this.config = config; this.coreDisputeAgentsService = coreDisputeAgentsService; this.coreHelpService = coreHelpService; @@ -97,6 +102,7 @@ public class CoreApi { this.corePriceService = corePriceService; this.walletsService = walletsService; this.tradeStatisticsManager = tradeStatisticsManager; + this.notificationService = notificationService; } @SuppressWarnings("SameReturnValue") @@ -112,6 +118,21 @@ public class CoreApi { coreDisputeAgentsService.registerDisputeAgent(disputeAgentType, registrationKey); } + /////////////////////////////////////////////////////////////////////////////////////////// + // Notifications + /////////////////////////////////////////////////////////////////////////////////////////// + + public interface NotificationListener { + void onMessage(@NonNull NotificationMessage message); + } + + public void addNotificationListener(NotificationListener listener) { + notificationService.addListener(listener); + } + + public void sendNotification(NotificationMessage notification) { + notificationService.sendNotification(notification); + } /////////////////////////////////////////////////////////////////////////////////////////// // Help diff --git a/core/src/main/java/bisq/core/api/CoreNotificationService.java b/core/src/main/java/bisq/core/api/CoreNotificationService.java new file mode 100644 index 00000000..2fee37fd --- /dev/null +++ b/core/src/main/java/bisq/core/api/CoreNotificationService.java @@ -0,0 +1,42 @@ +package bisq.core.api; + +import bisq.core.api.CoreApi.NotificationListener; + +import bisq.proto.grpc.NotificationMessage; + +import javax.inject.Singleton; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +@Singleton +@Slf4j +public class CoreNotificationService { + + private final Object lock = new Object(); + private final List listeners = new LinkedList<>(); + + public void addListener(@NonNull NotificationListener listener) { + synchronized (lock) { + listeners.add(listener); + } + } + + public void sendNotification(@NonNull NotificationMessage notification) { + synchronized (lock) { + for (Iterator iter = listeners.iterator(); iter.hasNext(); ) { + NotificationListener listener = iter.next(); + try { + listener.onMessage(notification); + } catch (RuntimeException e) { + log.warn("Failed to send message {} to listener {}", notification, listener, e); + iter.remove(); + } + } + } + } +} diff --git a/core/src/main/java/bisq/core/trade/TradeManager.java b/core/src/main/java/bisq/core/trade/TradeManager.java index 558e8281..d348b2d4 100644 --- a/core/src/main/java/bisq/core/trade/TradeManager.java +++ b/core/src/main/java/bisq/core/trade/TradeManager.java @@ -17,6 +17,7 @@ package bisq.core.trade; +import bisq.core.api.CoreNotificationService; import bisq.core.btc.model.XmrAddressEntry; import bisq.core.btc.wallet.XmrWalletService; import bisq.core.locale.Res; @@ -33,6 +34,7 @@ import bisq.core.provider.price.PriceFeedService; import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager; import bisq.core.support.dispute.mediation.mediator.Mediator; import bisq.core.support.dispute.mediation.mediator.MediatorManager; +import bisq.core.trade.Trade.Phase; import bisq.core.trade.closed.ClosedTradableManager; import bisq.core.trade.failed.FailedTradesManager; import bisq.core.trade.handlers.TradeResultHandler; @@ -64,6 +66,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.P2PService; import bisq.network.p2p.network.TorNetworkNode; +import bisq.proto.grpc.NotificationMessage; import com.google.common.collect.ImmutableList; import bisq.common.ClockWatcher; import bisq.common.config.Config; @@ -89,7 +92,8 @@ import javafx.collections.ListChangeListener; import javafx.collections.ObservableList; import org.bouncycastle.crypto.params.KeyParameter; - +import org.fxmisc.easybind.EasyBind; +import org.fxmisc.easybind.Subscription; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -127,6 +131,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi @Getter private final KeyRing keyRing; private final XmrWalletService xmrWalletService; + private final CoreNotificationService notificationService; private final OfferBookService offerBookService; private final OpenOfferManager openOfferManager; private final ClosedTradableManager closedTradableManager; @@ -166,6 +171,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi public TradeManager(User user, KeyRing keyRing, XmrWalletService xmrWalletService, + CoreNotificationService notificationService, OfferBookService offerBookService, OpenOfferManager openOfferManager, ClosedTradableManager closedTradableManager, @@ -186,6 +192,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi this.user = user; this.keyRing = keyRing; this.xmrWalletService = xmrWalletService; + this.notificationService = notificationService; this.offerBookService = offerBookService; this.openOfferManager = openOfferManager; this.closedTradableManager = closedTradableManager; @@ -510,6 +517,17 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi trade.getSelf().setReserveTxKey(openOffer.getReserveTxKey()); trade.getSelf().setReserveTxKeyImages(offer.getOfferPayload().getReserveTxKeyImages()); tradableList.add(trade); + + // notify on phase changes + // TODO (woodser): save subscription, bind on startup + EasyBind.subscribe(trade.statePhaseProperty(), phase -> { + if (phase == Phase.DEPOSIT_PUBLISHED) { + notificationService.sendNotification(NotificationMessage.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setTitle("Offer Taken") + .setMessage("Your offer " + offer.getId() + " has been accepted").build()); + } + }); ((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> { log.warn("Maker error during trade initialization: " + errorMessage); diff --git a/daemon/src/main/java/bisq/daemon/grpc/GrpcNotificationsService.java b/daemon/src/main/java/bisq/daemon/grpc/GrpcNotificationsService.java new file mode 100644 index 00000000..02cac936 --- /dev/null +++ b/daemon/src/main/java/bisq/daemon/grpc/GrpcNotificationsService.java @@ -0,0 +1,96 @@ +package bisq.daemon.grpc; + +import bisq.core.api.CoreApi; +import bisq.core.api.CoreApi.NotificationListener; + +import bisq.proto.grpc.NotificationMessage; +import bisq.proto.grpc.NotificationsGrpc.NotificationsImplBase; +import bisq.proto.grpc.RegisterNotificationListenerRequest; +import bisq.proto.grpc.SendNotificationReply; +import bisq.proto.grpc.SendNotificationRequest; + +import io.grpc.ServerInterceptor; +import io.grpc.stub.StreamObserver; + +import javax.inject.Inject; + +import java.util.HashMap; +import java.util.Optional; + +import lombok.NonNull; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; + +import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor; +import static bisq.proto.grpc.NotificationsGrpc.getRegisterNotificationListenerMethod; +import static bisq.proto.grpc.NotificationsGrpc.getSendNotificationMethod; +import static java.util.concurrent.TimeUnit.SECONDS; + + + +import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor; +import bisq.daemon.grpc.interceptor.GrpcCallRateMeter; + +@Slf4j +class GrpcNotificationsService extends NotificationsImplBase { + + private final CoreApi coreApi; + private final GrpcExceptionHandler exceptionHandler; + + @Inject + public GrpcNotificationsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) { + this.coreApi = coreApi; + this.exceptionHandler = exceptionHandler; + } + + @Override + public void registerNotificationListener(RegisterNotificationListenerRequest request, + StreamObserver responseObserver) { + try { + coreApi.addNotificationListener(new GrpcNotificationListener(responseObserver)); + // No onNext / onCompleted, as the response observer should be kept open + } catch (Throwable t) { + exceptionHandler.handleException(log, t, responseObserver); + } + } + + @Override + public void sendNotification(SendNotificationRequest request, + StreamObserver responseObserver) { + try { + coreApi.sendNotification(request.getNotification()); + responseObserver.onNext(SendNotificationReply.newBuilder().build()); + responseObserver.onCompleted(); + } catch (Throwable t) { + exceptionHandler.handleException(log, t, responseObserver); + } + } + + @Value + private static class GrpcNotificationListener implements NotificationListener { + + @NonNull + StreamObserver responseObserver; + + @Override + public void onMessage(@NonNull NotificationMessage message) { + responseObserver.onNext(message); + } + } + + final ServerInterceptor[] interceptors() { + Optional rateMeteringInterceptor = rateMeteringInterceptor(); + return rateMeteringInterceptor.map(serverInterceptor -> + new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]); + } + + final Optional rateMeteringInterceptor() { + return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass()) + .or(() -> Optional.of(CallRateMeteringInterceptor.valueOf( + new HashMap<>() {{ + put(getRegisterNotificationListenerMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS)); + put(getSendNotificationMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS)); + }} + ))); + } +} diff --git a/daemon/src/main/java/bisq/daemon/grpc/GrpcServer.java b/daemon/src/main/java/bisq/daemon/grpc/GrpcServer.java index b5a8b889..8bde9587 100644 --- a/daemon/src/main/java/bisq/daemon/grpc/GrpcServer.java +++ b/daemon/src/main/java/bisq/daemon/grpc/GrpcServer.java @@ -58,7 +58,8 @@ public class GrpcServer { GrpcVersionService versionService, GrpcGetTradeStatisticsService tradeStatisticsService, GrpcTradesService tradesService, - GrpcWalletsService walletsService) { + GrpcWalletsService walletsService, + GrpcNotificationsService notificationsService) { this.server = ServerBuilder.forPort(config.apiPort) .executor(UserThread.getExecutor()) .addService(interceptForward(disputeAgentsService, disputeAgentsService.interceptors())) @@ -71,6 +72,7 @@ public class GrpcServer { .addService(interceptForward(tradesService, tradesService.interceptors())) .addService(interceptForward(versionService, versionService.interceptors())) .addService(interceptForward(walletsService, walletsService.interceptors())) + .addService(interceptForward(notificationsService, notificationsService.interceptors())) .intercept(passwordAuthInterceptor) .build(); coreContext.setApiUser(true); diff --git a/proto/src/main/proto/grpc.proto b/proto/src/main/proto/grpc.proto index f3607ae8..cb733859 100644 --- a/proto/src/main/proto/grpc.proto +++ b/proto/src/main/proto/grpc.proto @@ -40,6 +40,33 @@ message RegisterDisputeAgentRequest { message RegisterDisputeAgentReply { } +/////////////////////////////////////////////////////////////////////////////////////////// +// Notifications +/////////////////////////////////////////////////////////////////////////////////////////// + +service Notifications { + rpc RegisterNotificationListener (RegisterNotificationListenerRequest) returns (stream NotificationMessage) { + } + rpc SendNotification (SendNotificationRequest) returns (SendNotificationReply) { // only used for testing + } +} + +message RegisterNotificationListenerRequest { +} + +message NotificationMessage { + int64 timestamp = 1; + string title = 2; + string message = 3; +} + +message SendNotificationRequest { + NotificationMessage notification = 1; +} + +message SendNotificationReply { +} + /////////////////////////////////////////////////////////////////////////////////////////// // Help ///////////////////////////////////////////////////////////////////////////////////////////