Add notifications listener

This commit is contained in:
Fritz Lumnitz 2022-01-09 17:02:44 +01:00 committed by woodser
parent 8332964ac5
commit 800b309a4b
6 changed files with 209 additions and 3 deletions

View file

@ -36,6 +36,8 @@ import bisq.common.config.Config;
import bisq.common.handlers.ErrorMessageHandler;
import bisq.common.handlers.ResultHandler;
import bisq.proto.grpc.NotificationMessage;
import org.bitcoinj.core.Coin;
import org.bitcoinj.core.Transaction;
@ -52,6 +54,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@ -77,6 +80,7 @@ public class CoreApi {
private final CoreTradesService coreTradesService;
private final CoreWalletsService walletsService;
private final TradeStatisticsManager tradeStatisticsManager;
private final CoreNotificationService notificationService;
@Inject
public CoreApi(Config config,
@ -87,7 +91,8 @@ public class CoreApi {
CorePriceService corePriceService,
CoreTradesService coreTradesService,
CoreWalletsService walletsService,
TradeStatisticsManager tradeStatisticsManager) {
TradeStatisticsManager tradeStatisticsManager,
CoreNotificationService notificationService) {
this.config = config;
this.coreDisputeAgentsService = coreDisputeAgentsService;
this.coreHelpService = coreHelpService;
@ -97,6 +102,7 @@ public class CoreApi {
this.corePriceService = corePriceService;
this.walletsService = walletsService;
this.tradeStatisticsManager = tradeStatisticsManager;
this.notificationService = notificationService;
}
@SuppressWarnings("SameReturnValue")
@ -112,6 +118,21 @@ public class CoreApi {
coreDisputeAgentsService.registerDisputeAgent(disputeAgentType, registrationKey);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Notifications
///////////////////////////////////////////////////////////////////////////////////////////
public interface NotificationListener {
void onMessage(@NonNull NotificationMessage message);
}
public void addNotificationListener(NotificationListener listener) {
notificationService.addListener(listener);
}
public void sendNotification(NotificationMessage notification) {
notificationService.sendNotification(notification);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Help

View file

@ -0,0 +1,42 @@
package bisq.core.api;
import bisq.core.api.CoreApi.NotificationListener;
import bisq.proto.grpc.NotificationMessage;
import javax.inject.Singleton;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class CoreNotificationService {
private final Object lock = new Object();
private final List<NotificationListener> listeners = new LinkedList<>();
public void addListener(@NonNull NotificationListener listener) {
synchronized (lock) {
listeners.add(listener);
}
}
public void sendNotification(@NonNull NotificationMessage notification) {
synchronized (lock) {
for (Iterator<NotificationListener> iter = listeners.iterator(); iter.hasNext(); ) {
NotificationListener listener = iter.next();
try {
listener.onMessage(notification);
} catch (RuntimeException e) {
log.warn("Failed to send message {} to listener {}", notification, listener, e);
iter.remove();
}
}
}
}
}

View file

@ -17,6 +17,7 @@
package bisq.core.trade;
import bisq.core.api.CoreNotificationService;
import bisq.core.btc.model.XmrAddressEntry;
import bisq.core.btc.wallet.XmrWalletService;
import bisq.core.locale.Res;
@ -33,6 +34,7 @@ import bisq.core.provider.price.PriceFeedService;
import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import bisq.core.support.dispute.mediation.mediator.Mediator;
import bisq.core.support.dispute.mediation.mediator.MediatorManager;
import bisq.core.trade.Trade.Phase;
import bisq.core.trade.closed.ClosedTradableManager;
import bisq.core.trade.failed.FailedTradesManager;
import bisq.core.trade.handlers.TradeResultHandler;
@ -64,6 +66,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.proto.grpc.NotificationMessage;
import com.google.common.collect.ImmutableList;
import bisq.common.ClockWatcher;
import bisq.common.config.Config;
@ -89,7 +92,8 @@ import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import org.bouncycastle.crypto.params.KeyParameter;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.Subscription;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -127,6 +131,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
@Getter
private final KeyRing keyRing;
private final XmrWalletService xmrWalletService;
private final CoreNotificationService notificationService;
private final OfferBookService offerBookService;
private final OpenOfferManager openOfferManager;
private final ClosedTradableManager closedTradableManager;
@ -166,6 +171,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
public TradeManager(User user,
KeyRing keyRing,
XmrWalletService xmrWalletService,
CoreNotificationService notificationService,
OfferBookService offerBookService,
OpenOfferManager openOfferManager,
ClosedTradableManager closedTradableManager,
@ -186,6 +192,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
this.user = user;
this.keyRing = keyRing;
this.xmrWalletService = xmrWalletService;
this.notificationService = notificationService;
this.offerBookService = offerBookService;
this.openOfferManager = openOfferManager;
this.closedTradableManager = closedTradableManager;
@ -510,6 +517,17 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
trade.getSelf().setReserveTxKey(openOffer.getReserveTxKey());
trade.getSelf().setReserveTxKeyImages(offer.getOfferPayload().getReserveTxKeyImages());
tradableList.add(trade);
// notify on phase changes
// TODO (woodser): save subscription, bind on startup
EasyBind.subscribe(trade.statePhaseProperty(), phase -> {
if (phase == Phase.DEPOSIT_PUBLISHED) {
notificationService.sendNotification(NotificationMessage.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setTitle("Offer Taken")
.setMessage("Your offer " + offer.getId() + " has been accepted").build());
}
});
((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
log.warn("Maker error during trade initialization: " + errorMessage);

View file

@ -0,0 +1,96 @@
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.api.CoreApi.NotificationListener;
import bisq.proto.grpc.NotificationMessage;
import bisq.proto.grpc.NotificationsGrpc.NotificationsImplBase;
import bisq.proto.grpc.RegisterNotificationListenerRequest;
import bisq.proto.grpc.SendNotificationReply;
import bisq.proto.grpc.SendNotificationRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.NotificationsGrpc.getRegisterNotificationListenerMethod;
import static bisq.proto.grpc.NotificationsGrpc.getSendNotificationMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcNotificationsService extends NotificationsImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcNotificationsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void registerNotificationListener(RegisterNotificationListenerRequest request,
StreamObserver<NotificationMessage> responseObserver) {
try {
coreApi.addNotificationListener(new GrpcNotificationListener(responseObserver));
// No onNext / onCompleted, as the response observer should be kept open
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
}
@Override
public void sendNotification(SendNotificationRequest request,
StreamObserver<SendNotificationReply> responseObserver) {
try {
coreApi.sendNotification(request.getNotification());
responseObserver.onNext(SendNotificationReply.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
}
@Value
private static class GrpcNotificationListener implements NotificationListener {
@NonNull
StreamObserver<NotificationMessage> responseObserver;
@Override
public void onMessage(@NonNull NotificationMessage message) {
responseObserver.onNext(message);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getRegisterNotificationListenerMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getSendNotificationMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
}}
)));
}
}

View file

@ -58,7 +58,8 @@ public class GrpcServer {
GrpcVersionService versionService,
GrpcGetTradeStatisticsService tradeStatisticsService,
GrpcTradesService tradesService,
GrpcWalletsService walletsService) {
GrpcWalletsService walletsService,
GrpcNotificationsService notificationsService) {
this.server = ServerBuilder.forPort(config.apiPort)
.executor(UserThread.getExecutor())
.addService(interceptForward(disputeAgentsService, disputeAgentsService.interceptors()))
@ -71,6 +72,7 @@ public class GrpcServer {
.addService(interceptForward(tradesService, tradesService.interceptors()))
.addService(interceptForward(versionService, versionService.interceptors()))
.addService(interceptForward(walletsService, walletsService.interceptors()))
.addService(interceptForward(notificationsService, notificationsService.interceptors()))
.intercept(passwordAuthInterceptor)
.build();
coreContext.setApiUser(true);

View file

@ -40,6 +40,33 @@ message RegisterDisputeAgentRequest {
message RegisterDisputeAgentReply {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Notifications
///////////////////////////////////////////////////////////////////////////////////////////
service Notifications {
rpc RegisterNotificationListener (RegisterNotificationListenerRequest) returns (stream NotificationMessage) {
}
rpc SendNotification (SendNotificationRequest) returns (SendNotificationReply) { // only used for testing
}
}
message RegisterNotificationListenerRequest {
}
message NotificationMessage {
int64 timestamp = 1;
string title = 2;
string message = 3;
}
message SendNotificationRequest {
NotificationMessage notification = 1;
}
message SendNotificationReply {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Help
///////////////////////////////////////////////////////////////////////////////////////////