stream subscription

This commit is contained in:
julian 2023-05-24 12:27:19 -06:00
parent 81689426e2
commit 4c12e870b1

View file

@ -1,6 +1,7 @@
import 'dart:async'; import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'dart:typed_data';
import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/logger.dart';
@ -18,6 +19,8 @@ class JsonRPC {
final Duration connectionTimeout; final Duration connectionTimeout;
Socket? socket; Socket? socket;
StreamSubscription<Uint8List>? _subscription;
Future<dynamic> request(String jsonRpcRequest) async { Future<dynamic> request(String jsonRpcRequest) async {
final completer = Completer<dynamic>(); final completer = Completer<dynamic>();
final List<int> responseData = []; final List<int> responseData = [];
@ -36,8 +39,10 @@ class JsonRPC {
.log("JsonRPC json.decode: $e\n$s", level: LogLevel.Error); .log("JsonRPC json.decode: $e\n$s", level: LogLevel.Error);
completer.completeError(e, s); completer.completeError(e, s);
} finally { } finally {
Logging.instance Logging.instance.log(
.log("JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); "JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy(); // socket?.destroy();
// TODO is this all we need to do? // TODO is this all we need to do?
} }
@ -48,15 +53,19 @@ class JsonRPC {
Logging.instance Logging.instance
.log("JsonRPC errorHandler: $error\n$trace", level: LogLevel.Error); .log("JsonRPC errorHandler: $error\n$trace", level: LogLevel.Error);
completer.completeError(error, trace); completer.completeError(error, trace);
Logging.instance Logging.instance.log(
.log("JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); "JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy(); // socket?.destroy();
// TODO do we need to recreate the socket? // TODO do we need to recreate the socket?
} }
void doneHandler() { void doneHandler() {
Logging.instance Logging.instance.log(
.log("JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}", level: LogLevel.Info); "JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy(); // socket?.destroy();
// TODO is this all we need? // TODO is this all we need?
} }
@ -67,14 +76,25 @@ class JsonRPC {
if (useSSL) { if (useSSL) {
socket ??= await SecureSocket.connect(host, port, socket ??= await SecureSocket.connect(host, port,
timeout: connectionTimeout, timeout: connectionTimeout, onBadCertificate: (_) => true);
onBadCertificate: (_) => true); _subscription ??= socket!.listen(
socket!.listen(dataHandler, dataHandler,
onError: errorHandler, onDone: doneHandler, cancelOnError: true); onError: errorHandler,
onDone: doneHandler,
cancelOnError: true,
);
} else { } else {
socket ??= await Socket.connect(host, port, timeout: connectionTimeout); socket ??= await Socket.connect(
socket!.listen(dataHandler, host,
onError: errorHandler, onDone: doneHandler, cancelOnError: true); port,
timeout: connectionTimeout,
);
_subscription ??= socket!.listen(
dataHandler,
onError: errorHandler,
onDone: doneHandler,
cancelOnError: true,
);
} }
socket?.write('$jsonRpcRequest\r\n'); socket?.write('$jsonRpcRequest\r\n');