From 4c12e870b1ac34c29cd3c995e279ca12429da999 Mon Sep 17 00:00:00 2001 From: julian Date: Wed, 24 May 2023 12:27:19 -0600 Subject: [PATCH] stream subscription --- lib/electrumx_rpc/rpc.dart | 46 +++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/lib/electrumx_rpc/rpc.dart b/lib/electrumx_rpc/rpc.dart index bf76f66bb..34a354c95 100644 --- a/lib/electrumx_rpc/rpc.dart +++ b/lib/electrumx_rpc/rpc.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; import 'package:stackwallet/utilities/logger.dart'; @@ -18,6 +19,8 @@ class JsonRPC { final Duration connectionTimeout; Socket? socket; + StreamSubscription? _subscription; + Future request(String jsonRpcRequest) async { final completer = Completer(); final List responseData = []; @@ -36,8 +39,10 @@ class JsonRPC { .log("JsonRPC json.decode: $e\n$s", level: LogLevel.Error); completer.completeError(e, s); } finally { - Logging.instance - .log("JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); + Logging.instance.log( + "JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}", + level: LogLevel.Info, + ); // socket?.destroy(); // TODO is this all we need to do? } @@ -48,15 +53,19 @@ class JsonRPC { Logging.instance .log("JsonRPC errorHandler: $error\n$trace", level: LogLevel.Error); completer.completeError(error, trace); - Logging.instance - .log("JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); + Logging.instance.log( + "JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}", + level: LogLevel.Info, + ); // socket?.destroy(); // TODO do we need to recreate the socket? } void doneHandler() { - Logging.instance - .log("JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); + Logging.instance.log( + "JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}", + level: LogLevel.Info, + ); // socket?.destroy(); // TODO is this all we need? } @@ -67,14 +76,25 @@ class JsonRPC { if (useSSL) { socket ??= await SecureSocket.connect(host, port, - timeout: connectionTimeout, - onBadCertificate: (_) => true); - socket!.listen(dataHandler, - onError: errorHandler, onDone: doneHandler, cancelOnError: true); + timeout: connectionTimeout, onBadCertificate: (_) => true); + _subscription ??= socket!.listen( + dataHandler, + onError: errorHandler, + onDone: doneHandler, + cancelOnError: true, + ); } else { - socket ??= await Socket.connect(host, port, timeout: connectionTimeout); - socket!.listen(dataHandler, - onError: errorHandler, onDone: doneHandler, cancelOnError: true); + socket ??= await Socket.connect( + host, + port, + timeout: connectionTimeout, + ); + _subscription ??= socket!.listen( + dataHandler, + onError: errorHandler, + onDone: doneHandler, + cancelOnError: true, + ); } socket?.write('$jsonRpcRequest\r\n');