From 69a085e9a6a44687299da0eeef506b8285dfe6fc Mon Sep 17 00:00:00 2001 From: Josh Babb Date: Thu, 25 May 2023 15:52:07 -0500 Subject: [PATCH] rpc2.dart -> rpc.dart --- lib/electrumx_rpc/electrumx.dart | 2 +- lib/electrumx_rpc/rpc.dart | 268 +++++++++++++++++++++---------- lib/electrumx_rpc/rpc2.dart | 222 ------------------------- test/electrumx_test.dart | 2 +- test/electrumx_test.mocks.dart | 2 +- test/json_rpc_test.dart | 2 +- 6 files changed, 185 insertions(+), 313 deletions(-) delete mode 100644 lib/electrumx_rpc/rpc2.dart diff --git a/lib/electrumx_rpc/electrumx.dart b/lib/electrumx_rpc/electrumx.dart index c49a33b44..8803f12fa 100644 --- a/lib/electrumx_rpc/electrumx.dart +++ b/lib/electrumx_rpc/electrumx.dart @@ -3,7 +3,7 @@ import 'dart:io'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:decimal/decimal.dart'; -import 'package:stackwallet/electrumx_rpc/rpc2.dart'; +import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/prefs.dart'; diff --git a/lib/electrumx_rpc/rpc.dart b/lib/electrumx_rpc/rpc.dart index 6018ee923..7dcf77350 100644 --- a/lib/electrumx_rpc/rpc.dart +++ b/lib/electrumx_rpc/rpc.dart @@ -2,8 +2,8 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; -import 'package:mutex/mutex.dart'; +import 'package:mutex/mutex.dart'; import 'package:stackwallet/utilities/logger.dart'; // hacky fix to receive large jsonrpc responses @@ -19,110 +19,204 @@ class JsonRPC { final int port; final Duration connectionTimeout; - Socket? socket; + final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue(); + Socket? _socket; StreamSubscription? _subscription; - final m = Mutex(); + void _dataHandler(List data) { + if (_requestQueue.isEmpty) { + // probably just return although this case should never actually hit + // TODO anything else here? + return; + } - void Function(List)? _onData; - void Function(Object, StackTrace)? _onError; + final req = _requestQueue.next; + req.appendDataAndCheckIfComplete(data); + + if (req.isComplete) { + _onReqCompleted(req); + } + } + + void _errorHandler(Object error, StackTrace trace) { + Logging.instance.log( + "JsonRPC errorHandler: $error\n$trace", + level: LogLevel.Error, + ); + + final req = _requestQueue.next; + req.completer.completeError(error, trace); + _onReqCompleted(req); + } + + void _doneHandler() { + Logging.instance.log( + "JsonRPC doneHandler: " + "connection closed to ${_socket?.address}:${_socket?.port}, destroying socket", + level: LogLevel.Info, + ); + _socket?.destroy(); + _socket = null; // is this redundant? + // should we also cancel and/or null the subscription? + + if (_requestQueue.isNotEmpty) { + // TODO iterate over the remaining requests and if they are not isComplete then complete the completer with an error + Logging.instance.log( + "JsonRPC doneHandler: queue not empty but connection closed, completing pending requests with errors", + level: LogLevel.Error, + ); + _errorPendingRequests(); + } + } + + Future _onReqCompleted(_JsonRPCRequest req) async { + await _requestQueue.remove(req); + if (_requestQueue.isNotEmpty) { + _sendNextAvailableRequest(); + } + } + + void _sendNextAvailableRequest() { + if (_requestQueue.isEmpty) { + // TODO handle properly + throw Exception("JSON RPC queue empty"); + } + + final req = _requestQueue.next; + + _socket!.write('${req.jsonRequest}\r\n'); + Logging.instance.log( + "JsonRPC request: wrote request ${req.jsonRequest} " + "to socket ${_socket?.address}:${_socket?.port}", + level: LogLevel.Info, + ); + } + + void _errorPendingRequests() { + if (_requestQueue.isNotEmpty) { + final req = _requestQueue.next; + if (!(req.isComplete)) { + req.completer.completeError('JsonRPC doneHandler: socket closed before request could complete'); + _requestQueue.remove(req).then((ret) { + if (_requestQueue.isNotEmpty) { + _errorPendingRequests(); + } + }); + } + } else { + Logging.instance.log( + "JsonRPC _errorPendingRequests: done completing pending requests with errors", + level: LogLevel.Info, + ); + } + } Future request(String jsonRpcRequest) async { - final completer = Completer(); - final List responseData = []; - - void dataHandler(List data) { - responseData.addAll(data); - - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { - try { - final response = json.decode(String.fromCharCodes(responseData)); - completer.complete(response); // TODO only complete on last chunk? - } catch (e, s) { - Logging.instance - .log("JsonRPC json.decode: $e\n$s", level: LogLevel.Error); - completer.completeError(e, s); - } finally { - Logging.instance.log( - "JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}", - level: LogLevel.Info, - ); - // socket?.destroy(); - // TODO is this all we need to do? - } - } - } - - _onData = dataHandler; - - void errorHandler(Object error, StackTrace trace) { - Logging.instance - .log("JsonRPC errorHandler: $error\n$trace", level: LogLevel.Error); - completer.completeError(error, trace); - Logging.instance.log( - "JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}", - level: LogLevel.Info, - ); - // socket?.destroy(); - // TODO do we need to recreate the socket? - } - - _onError = errorHandler; - - void doneHandler() { - Logging.instance.log( - "JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}", - level: LogLevel.Info, - ); - // socket?.destroy(); - m.release(); - // TODO is this all we need? - } - - if (socket != null) { - // TODO check if the socket is valid, alive, connected, etc - } else { + // todo: handle this better? + // Do we need to check the subscription, too? + if (_socket == null) { Logging.instance.log( "JsonRPC request: opening socket $host:$port", level: LogLevel.Info, ); + await connect(); } - // Do we need to check the subscription, too? - await m.acquire(); + final req = _JsonRPCRequest( + jsonRequest: jsonRpcRequest, + completer: Completer(), + ); - if (useSSL) { - socket ??= await SecureSocket.connect(host, port, - timeout: connectionTimeout, onBadCertificate: (_) => true); // TODO do not automatically trust bad certificates - _subscription ??= socket!.listen( - _onData, - onError: _onError, - onDone: doneHandler, - cancelOnError: true, - ); + await _requestQueue.add(req); + + // if this is the only/first request then send it right away + if (_requestQueue.length == 1) { + _sendNextAvailableRequest(); } else { - socket ??= await Socket.connect( + Logging.instance.log( + "JsonRPC request: queued request $jsonRpcRequest " + "to socket ${_socket?.address}:${_socket?.port}", + level: LogLevel.Info, + ); + } + + return req.completer.future; + } + + Future disconnect() async { + await _subscription?.cancel(); + _subscription = null; + _socket?.destroy(); + } + + Future connect() async { + if (useSSL) { + _socket ??= await SecureSocket.connect( + host, + port, + timeout: connectionTimeout, + onBadCertificate: (_) => true, + ); // TODO do not automatically trust bad certificates + } else { + _socket ??= await Socket.connect( host, port, timeout: connectionTimeout, ); - _subscription ??= socket!.listen( - _onData, - onError: _onError, - onDone: doneHandler, - cancelOnError: true, - ); } - - socket!.write('$jsonRpcRequest\r\n'); - - Logging.instance.log( - "JsonRPC request: wrote request $jsonRpcRequest to socket ${socket?.address}:${socket?.port}", - level: LogLevel.Info, + await _subscription?.cancel(); + _subscription = _socket!.listen( + _dataHandler, + onError: _errorHandler, + onDone: _doneHandler, + cancelOnError: true, ); - - return completer.future; } } + +// mutex *may* not be needed as the protected functions are not async +class _JsonRPCRequestQueue { + final _m = Mutex(); + final List<_JsonRPCRequest> _rq = []; + + Future add(_JsonRPCRequest req) async { + await _m.protect(() async => _rq.add(req)); + } + + Future remove(_JsonRPCRequest req) async { + await _m.protect(() async => _rq.remove(req)); + } + + bool get isEmpty => _rq.isEmpty; + bool get isNotEmpty => _rq.isNotEmpty; + int get length => _rq.length; + _JsonRPCRequest get next => _rq.first; +} + +class _JsonRPCRequest { + final String jsonRequest; + final Completer completer; + final List _responseData = []; + + _JsonRPCRequest({required this.jsonRequest, required this.completer}); + + void appendDataAndCheckIfComplete(List data) { + _responseData.addAll(data); + // 0x0A is newline + // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html + if (data.last == 0x0A) { + try { + final response = json.decode(String.fromCharCodes(_responseData)); + completer.complete(response); + } catch (e, s) { + Logging.instance.log( + "JsonRPC json.decode: $e\n$s", + level: LogLevel.Error, + ); + completer.completeError(e, s); + } + } + } + + bool get isComplete => completer.isCompleted; +} diff --git a/lib/electrumx_rpc/rpc2.dart b/lib/electrumx_rpc/rpc2.dart deleted file mode 100644 index 7dcf77350..000000000 --- a/lib/electrumx_rpc/rpc2.dart +++ /dev/null @@ -1,222 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; -import 'dart:typed_data'; - -import 'package:mutex/mutex.dart'; -import 'package:stackwallet/utilities/logger.dart'; - -// hacky fix to receive large jsonrpc responses -class JsonRPC { - JsonRPC({ - required this.host, - required this.port, - this.useSSL = false, - this.connectionTimeout = const Duration(seconds: 60), - }); - final bool useSSL; - final String host; - final int port; - final Duration connectionTimeout; - - final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue(); - Socket? _socket; - StreamSubscription? _subscription; - - void _dataHandler(List data) { - if (_requestQueue.isEmpty) { - // probably just return although this case should never actually hit - // TODO anything else here? - return; - } - - final req = _requestQueue.next; - req.appendDataAndCheckIfComplete(data); - - if (req.isComplete) { - _onReqCompleted(req); - } - } - - void _errorHandler(Object error, StackTrace trace) { - Logging.instance.log( - "JsonRPC errorHandler: $error\n$trace", - level: LogLevel.Error, - ); - - final req = _requestQueue.next; - req.completer.completeError(error, trace); - _onReqCompleted(req); - } - - void _doneHandler() { - Logging.instance.log( - "JsonRPC doneHandler: " - "connection closed to ${_socket?.address}:${_socket?.port}, destroying socket", - level: LogLevel.Info, - ); - _socket?.destroy(); - _socket = null; // is this redundant? - // should we also cancel and/or null the subscription? - - if (_requestQueue.isNotEmpty) { - // TODO iterate over the remaining requests and if they are not isComplete then complete the completer with an error - Logging.instance.log( - "JsonRPC doneHandler: queue not empty but connection closed, completing pending requests with errors", - level: LogLevel.Error, - ); - _errorPendingRequests(); - } - } - - Future _onReqCompleted(_JsonRPCRequest req) async { - await _requestQueue.remove(req); - if (_requestQueue.isNotEmpty) { - _sendNextAvailableRequest(); - } - } - - void _sendNextAvailableRequest() { - if (_requestQueue.isEmpty) { - // TODO handle properly - throw Exception("JSON RPC queue empty"); - } - - final req = _requestQueue.next; - - _socket!.write('${req.jsonRequest}\r\n'); - Logging.instance.log( - "JsonRPC request: wrote request ${req.jsonRequest} " - "to socket ${_socket?.address}:${_socket?.port}", - level: LogLevel.Info, - ); - } - - void _errorPendingRequests() { - if (_requestQueue.isNotEmpty) { - final req = _requestQueue.next; - if (!(req.isComplete)) { - req.completer.completeError('JsonRPC doneHandler: socket closed before request could complete'); - _requestQueue.remove(req).then((ret) { - if (_requestQueue.isNotEmpty) { - _errorPendingRequests(); - } - }); - } - } else { - Logging.instance.log( - "JsonRPC _errorPendingRequests: done completing pending requests with errors", - level: LogLevel.Info, - ); - } - } - - Future request(String jsonRpcRequest) async { - // todo: handle this better? - // Do we need to check the subscription, too? - if (_socket == null) { - Logging.instance.log( - "JsonRPC request: opening socket $host:$port", - level: LogLevel.Info, - ); - await connect(); - } - - final req = _JsonRPCRequest( - jsonRequest: jsonRpcRequest, - completer: Completer(), - ); - - await _requestQueue.add(req); - - // if this is the only/first request then send it right away - if (_requestQueue.length == 1) { - _sendNextAvailableRequest(); - } else { - Logging.instance.log( - "JsonRPC request: queued request $jsonRpcRequest " - "to socket ${_socket?.address}:${_socket?.port}", - level: LogLevel.Info, - ); - } - - return req.completer.future; - } - - Future disconnect() async { - await _subscription?.cancel(); - _subscription = null; - _socket?.destroy(); - } - - Future connect() async { - if (useSSL) { - _socket ??= await SecureSocket.connect( - host, - port, - timeout: connectionTimeout, - onBadCertificate: (_) => true, - ); // TODO do not automatically trust bad certificates - } else { - _socket ??= await Socket.connect( - host, - port, - timeout: connectionTimeout, - ); - } - await _subscription?.cancel(); - _subscription = _socket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } -} - -// mutex *may* not be needed as the protected functions are not async -class _JsonRPCRequestQueue { - final _m = Mutex(); - final List<_JsonRPCRequest> _rq = []; - - Future add(_JsonRPCRequest req) async { - await _m.protect(() async => _rq.add(req)); - } - - Future remove(_JsonRPCRequest req) async { - await _m.protect(() async => _rq.remove(req)); - } - - bool get isEmpty => _rq.isEmpty; - bool get isNotEmpty => _rq.isNotEmpty; - int get length => _rq.length; - _JsonRPCRequest get next => _rq.first; -} - -class _JsonRPCRequest { - final String jsonRequest; - final Completer completer; - final List _responseData = []; - - _JsonRPCRequest({required this.jsonRequest, required this.completer}); - - void appendDataAndCheckIfComplete(List data) { - _responseData.addAll(data); - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { - try { - final response = json.decode(String.fromCharCodes(_responseData)); - completer.complete(response); - } catch (e, s) { - Logging.instance.log( - "JsonRPC json.decode: $e\n$s", - level: LogLevel.Error, - ); - completer.completeError(e, s); - } - } - } - - bool get isComplete => completer.isCompleted; -} diff --git a/test/electrumx_test.dart b/test/electrumx_test.dart index b4e11cc0a..3ae162529 100644 --- a/test/electrumx_test.dart +++ b/test/electrumx_test.dart @@ -2,7 +2,7 @@ import 'package:flutter_test/flutter_test.dart'; import 'package:mockito/annotations.dart'; import 'package:mockito/mockito.dart'; import 'package:stackwallet/electrumx_rpc/electrumx.dart'; -import 'package:stackwallet/electrumx_rpc/rpc2.dart'; +import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/utilities/prefs.dart'; import 'electrumx_test.mocks.dart'; diff --git a/test/electrumx_test.mocks.dart b/test/electrumx_test.mocks.dart index a39ae7d19..1cfe9cf4a 100644 --- a/test/electrumx_test.mocks.dart +++ b/test/electrumx_test.mocks.dart @@ -7,7 +7,7 @@ import 'dart:async' as _i3; import 'dart:ui' as _i7; import 'package:mockito/mockito.dart' as _i1; -import 'package:stackwallet/electrumx_rpc/rpc2.dart' as _i2; +import 'package:stackwallet/electrumx_rpc/rpc.dart' as _i2; import 'package:stackwallet/utilities/enums/backup_frequency_type.dart' as _i6; import 'package:stackwallet/utilities/enums/sync_type_enum.dart' as _i5; import 'package:stackwallet/utilities/prefs.dart' as _i4; diff --git a/test/json_rpc_test.dart b/test/json_rpc_test.dart index 92c102073..ea6b214be 100644 --- a/test/json_rpc_test.dart +++ b/test/json_rpc_test.dart @@ -1,7 +1,7 @@ import 'dart:io'; import 'package:flutter_test/flutter_test.dart'; -import 'package:stackwallet/electrumx_rpc/rpc2.dart'; +import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/utilities/default_nodes.dart'; void main() {