diff --git a/lib/electrumx_rpc/electrumx.dart b/lib/electrumx_rpc/electrumx.dart index 20f046320..d8267c7cd 100644 --- a/lib/electrumx_rpc/electrumx.dart +++ b/lib/electrumx_rpc/electrumx.dart @@ -132,13 +132,12 @@ class ElectrumX { final response = await _rpcClient!.request(jsonRequestString); - print("================================================="); - print("TYPE: ${response.runtimeType}"); - print("RESPONSE: $response"); - print("================================================="); + if (response.exception != null) { + throw response.exception!; + } - if (response["error"] != null) { - if (response["error"] + if (response.data["error"] != null) { + if (response.data["error"] .toString() .contains("No such mempool or blockchain transaction")) { throw NoSuchTransactionException( @@ -148,11 +147,15 @@ class ElectrumX { } throw Exception( - "JSONRPC response \ncommand: $command \nargs: $args \nerror: $response"); + "JSONRPC response\n" + " command: $command\n" + " args: $args\n" + " error: $response.data", + ); } currentFailoverIndex = -1; - return response; + return response.data; } on WifiOnlyException { rethrow; } on SocketException { @@ -233,7 +236,13 @@ class ElectrumX { // Logging.instance.log("batch request: $request"); // send batch request - final response = (await _rpcClient!.request(request)) as List; + final jsonRpcResponse = (await _rpcClient!.request(request)); + + if (jsonRpcResponse.exception != null) { + throw jsonRpcResponse.exception!; + } + + final response = jsonRpcResponse.data as List; // check for errors, format and throw if there are any final List errors = []; diff --git a/lib/electrumx_rpc/rpc.dart b/lib/electrumx_rpc/rpc.dart index c7f654cc2..0cc960271 100644 --- a/lib/electrumx_rpc/rpc.dart +++ b/lib/electrumx_rpc/rpc.dart @@ -1,12 +1,12 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; -import 'dart:typed_data'; +import 'package:flutter/foundation.dart'; import 'package:mutex/mutex.dart'; import 'package:stackwallet/utilities/logger.dart'; -// hacky fix to receive large jsonrpc responses +// Json RPC class to handle connecting to electrumx servers class JsonRPC { JsonRPC({ required this.host, @@ -25,90 +25,60 @@ class JsonRPC { StreamSubscription? _subscription; void _dataHandler(List data) { - if (_requestQueue.isEmpty) { - // probably just return although this case should never actually hit - return; - } + _requestQueue.nextIncompleteReq.then((req) { + if (req != null) { + req.appendDataAndCheckIfComplete(data); - final req = _requestQueue.next; - req.appendDataAndCheckIfComplete(data); - - if (req.isComplete) { - _onReqCompleted(req); - } + if (req.isComplete) { + _onReqCompleted(req); + } + } else { + Logging.instance.log( + "_dataHandler found a null req!", + level: LogLevel.Warning, + ); + } + }); } void _errorHandler(Object error, StackTrace trace) { - Logging.instance.log( - "JsonRPC errorHandler: $error\n$trace", - level: LogLevel.Error, - ); - - final req = _requestQueue.next; - req.completer.completeError(error, trace); - _onReqCompleted(req); + _requestQueue.nextIncompleteReq.then((req) { + if (req != null) { + req.completer.completeError(error, trace); + _onReqCompleted(req); + } + }); } void _doneHandler() { - Logging.instance.log( - "JsonRPC doneHandler: " - "connection closed to $host:$port, destroying socket", - level: LogLevel.Info, - ); - - if (_requestQueue.isNotEmpty) { - Logging.instance.log( - "JsonRPC doneHandler: queue not empty but connection closed, " - "completing pending requests with errors", - level: LogLevel.Error, - ); - - for (final req in _requestQueue.queue) { - if (!req.isComplete) { - try { - throw Exception( - "JsonRPC doneHandler: socket closed " - "before request could complete", - ); - } catch (e, s) { - req.completer.completeError(e, s); - } - } - } - _requestQueue.clear(); - } - - disconnect(); + disconnect(reason: "JsonRPC _doneHandler() called"); } void _onReqCompleted(_JsonRPCRequest req) { - _requestQueue.remove(req); - if (_requestQueue.isNotEmpty) { + _requestQueue.remove(req).then((_) { + // attempt to send next request _sendNextAvailableRequest(); - } + }); } void _sendNextAvailableRequest() { - if (_requestQueue.isEmpty) { - // TODO handle properly - throw Exception("JSON RPC queue empty"); - } + _requestQueue.nextIncompleteReq.then((req) { + if (req != null) { + // \r\n required by electrumx server + _socket!.write('${req.jsonRequest}\r\n'); - final req = _requestQueue.next; - - _socket!.write('${req.jsonRequest}\r\n'); - - req.initiateTimeout(const Duration(seconds: 10)); - // Logging.instance.log( - // "JsonRPC request: wrote request ${req.jsonRequest} " - // "to socket $host:$port", - // level: LogLevel.Info, - // ); + // TODO different timeout length? + req.initiateTimeout( + const Duration(seconds: 10), + onTimedOut: () { + _requestQueue.remove(req); + }, + ); + } + }); } - Future request(String jsonRpcRequest) async { - // todo: handle this better? - // Do we need to check the subscription, too? + Future request(String jsonRpcRequest) async { await _requestMutex.protect(() async { if (_socket == null) { Logging.instance.log( @@ -121,51 +91,69 @@ class JsonRPC { final req = _JsonRPCRequest( jsonRequest: jsonRpcRequest, - completer: Completer(), + completer: Completer(), ); - _requestQueue.add(req); + final future = req.completer.future.onError( + (error, stackTrace) async { + await disconnect( + reason: "return req.completer.future.onError: $error\n$stackTrace", + ); + return JsonRPCResponse( + exception: error is Exception + ? error + : Exception( + "req.completer.future.onError: $error\n$stackTrace", + ), + ); + }, + ); // if this is the only/first request then send it right away - if (_requestQueue.length == 1) { - _sendNextAvailableRequest(); - } else { - // Logging.instance.log( - // "JsonRPC request: queued request $jsonRpcRequest " - // "to socket $host:$port", - // level: LogLevel.Info, - // ); - } - - return req.completer.future.onError( - (error, stackTrace) => - Exception("return req.completer.future.onError: $error"), + await _requestQueue.add( + req, + onInitialRequestAdded: _sendNextAvailableRequest, ); + + return future; } - void disconnect() { - // TODO: maybe clear req queue here and wrap in mutex? - _subscription?.cancel().then((_) => _subscription = null); - _socket?.destroy(); - _socket = null; + Future disconnect({required String reason}) async { + await _requestMutex.protect(() async { + await _subscription?.cancel(); + _subscription = null; + _socket?.destroy(); + _socket = null; + + // clean up remaining queue + await _requestQueue.completeRemainingWithError( + "JsonRPC disconnect() called with reason: \"$reason\"", + ); + }); } Future connect() async { + if (_socket != null) { + throw Exception( + "JsonRPC attempted to connect to an already existing socket!", + ); + } + if (useSSL) { - _socket ??= await SecureSocket.connect( + _socket = await SecureSocket.connect( host, port, timeout: connectionTimeout, onBadCertificate: (_) => true, ); // TODO do not automatically trust bad certificates } else { - _socket ??= await Socket.connect( + _socket = await Socket.connect( host, port, timeout: connectionTimeout, ); } - await _subscription?.cancel(); + _subscription = _socket!.listen( _dataHandler, onError: _errorHandler, @@ -176,36 +164,85 @@ class JsonRPC { } class _JsonRPCRequestQueue { + final _lock = Mutex(); final List<_JsonRPCRequest> _rq = []; - void add(_JsonRPCRequest req) => _rq.add(req); + Future add( + _JsonRPCRequest req, { + VoidCallback? onInitialRequestAdded, + }) async { + return await _lock.protect(() async { + _rq.add(req); + if (_rq.length == 1) { + onInitialRequestAdded?.call(); + } + }); + } - bool remove(_JsonRPCRequest req) => _rq.remove(req); + Future remove(_JsonRPCRequest req) async { + return await _lock.protect(() async { + final result = _rq.remove(req); + return result; + }); + } - void clear() => _rq.clear(); + Future<_JsonRPCRequest?> get nextIncompleteReq async { + return await _lock.protect(() async { + int removeCount = 0; + _JsonRPCRequest? returnValue; + for (final req in _rq) { + if (req.isComplete) { + removeCount++; + } else { + returnValue = req; + break; + } + } - bool get isEmpty => _rq.isEmpty; - bool get isNotEmpty => _rq.isNotEmpty; - int get length => _rq.length; - _JsonRPCRequest get next => _rq.first; - List<_JsonRPCRequest> get queue => _rq.toList(growable: false); + _rq.removeRange(0, removeCount); + + return returnValue; + }); + } + + Future completeRemainingWithError( + String error, { + StackTrace? stackTrace, + }) async { + await _lock.protect(() async { + for (final req in _rq) { + if (!req.isComplete) { + req.completer.completeError(Exception(error), stackTrace); + } + } + _rq.clear(); + }); + } + + Future get isEmpty async { + return await _lock.protect(() async { + return _rq.isEmpty; + }); + } } class _JsonRPCRequest { + // 0x0A is newline + // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html + static const int separatorByte = 0x0A; + final String jsonRequest; - final Completer completer; + final Completer completer; final List _responseData = []; _JsonRPCRequest({required this.jsonRequest, required this.completer}); void appendDataAndCheckIfComplete(List data) { _responseData.addAll(data); - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { + if (data.last == separatorByte) { try { final response = json.decode(String.fromCharCodes(_responseData)); - completer.complete(response); + completer.complete(JsonRPCResponse(data: response)); } catch (e, s) { Logging.instance.log( "JsonRPC json.decode: $e\n$s", @@ -216,13 +253,17 @@ class _JsonRPCRequest { } } - void initiateTimeout(Duration timeout) { + void initiateTimeout( + Duration timeout, { + VoidCallback? onTimedOut, + }) { Future.delayed(timeout).then((_) { if (!isComplete) { try { throw Exception("_JsonRPCRequest timed out: $jsonRequest"); } catch (e, s) { completer.completeError(e, s); + onTimedOut?.call(); } } }); @@ -230,3 +271,10 @@ class _JsonRPCRequest { bool get isComplete => completer.isCompleted; } + +class JsonRPCResponse { + final dynamic data; + final Exception? exception; + + JsonRPCResponse({this.data, this.exception}); +} diff --git a/lib/electrumx_rpc/rpc2.dart b/lib/electrumx_rpc/rpc2.dart deleted file mode 100644 index 15d2076cc..000000000 --- a/lib/electrumx_rpc/rpc2.dart +++ /dev/null @@ -1,261 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:flutter/foundation.dart'; -import 'package:mutex/mutex.dart'; -import 'package:stackwallet/utilities/logger.dart'; - -// hacky fix to receive large jsonrpc responses -class JsonRPC { - JsonRPC({ - required this.host, - required this.port, - this.useSSL = false, - this.connectionTimeout = const Duration(seconds: 60), - }); - final bool useSSL; - final String host; - final int port; - final Duration connectionTimeout; - - final _requestMutex = Mutex(); - final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue(); - Socket? _socket; - StreamSubscription? _subscription; - - void _dataHandler(List data) { - _requestQueue.nextIncompleteReq.then((req) { - if (req != null) { - req.appendDataAndCheckIfComplete(data); - - if (req.isComplete) { - _onReqCompleted(req); - } - } else { - Logging.instance.log( - "_dataHandler found a null req!", - level: LogLevel.Warning, - ); - } - }); - } - - void _errorHandler(Object error, StackTrace trace) { - _requestQueue.nextIncompleteReq.then((req) { - if (req != null) { - req.completer.completeError(error, trace); - _onReqCompleted(req); - } else { - Logging.instance.log( - "_errorHandler found a null req!", - level: LogLevel.Warning, - ); - } - }); - } - - void _doneHandler() { - Logging.instance.log( - "JsonRPC doneHandler: " - "connection closed to $host:$port, destroying socket", - level: LogLevel.Info, - ); - - disconnect(reason: "JsonRPC _doneHandler() called"); - } - - void _onReqCompleted(_JsonRPCRequest req) { - _requestQueue.remove(req).then((value) { - if (kDebugMode) { - print("Request removed from queue: $value"); - } - // attempt to send next request - _sendNextAvailableRequest(); - }); - } - - void _sendNextAvailableRequest() { - _requestQueue.nextIncompleteReq.then((req) { - if (req != null) { - // \r\n required by electrumx server - _socket!.write('${req.jsonRequest}\r\n'); - - // TODO different timeout length? - req.initiateTimeout(const Duration(seconds: 10)); - } else { - Logging.instance.log( - "_sendNextAvailableRequest found a null req!", - level: LogLevel.Warning, - ); - } - }); - } - - // TODO: non dynamic type - Future request(String jsonRpcRequest) async { - await _requestMutex.protect(() async { - if (_socket == null) { - Logging.instance.log( - "JsonRPC request: opening socket $host:$port", - level: LogLevel.Info, - ); - await connect(); - } - }); - - final req = _JsonRPCRequest( - jsonRequest: jsonRpcRequest, - completer: Completer(), - ); - - // if this is the only/first request then send it right away - await _requestQueue.add( - req, - onInitialRequestAdded: _sendNextAvailableRequest, - ); - - return req.completer.future.onError( - (error, stackTrace) => Exception( - "return req.completer.future.onError: $error", - ), - ); - } - - Future disconnect({String reason = "none"}) async { - await _requestMutex.protect(() async { - await _subscription?.cancel(); - _subscription = null; - _socket?.destroy(); - _socket = null; - - // clean up remaining queue - await _requestQueue.completeRemainingWithError( - "JsonRPC disconnect() called with reason: \"$reason\"", - ); - }); - } - - Future connect() async { - if (_socket != null) { - throw Exception( - "JsonRPC attempted to connect to an already existing socket!", - ); - } - - if (useSSL) { - _socket = await SecureSocket.connect( - host, - port, - timeout: connectionTimeout, - onBadCertificate: (_) => true, - ); // TODO do not automatically trust bad certificates - } else { - _socket = await Socket.connect( - host, - port, - timeout: connectionTimeout, - ); - } - - _subscription = _socket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } -} - -class _JsonRPCRequestQueue { - final m = Mutex(); - - final List<_JsonRPCRequest> _rq = []; - - Future add( - _JsonRPCRequest req, { - VoidCallback? onInitialRequestAdded, - }) async { - return await m.protect(() async { - _rq.add(req); - if (_rq.length == 1) { - onInitialRequestAdded?.call(); - } - }); - } - - Future remove(_JsonRPCRequest req) async { - return await m.protect(() async => _rq.remove(req)); - } - - Future<_JsonRPCRequest?> get nextIncompleteReq async { - return await m.protect(() async { - try { - return _rq.firstWhere((e) => !e.isComplete); - } catch (_) { - // no incomplete requests found - return null; - } - }); - } - - Future completeRemainingWithError( - String error, { - StackTrace? stackTrace, - }) async { - await m.protect(() async { - for (final req in _rq) { - if (!req.isComplete) { - req.completer.completeError(Exception(error), stackTrace); - } - } - _rq.clear(); - }); - } - - Future get isEmpty async { - return await m.protect(() async { - return _rq.isEmpty; - }); - } -} - -class _JsonRPCRequest { - final String jsonRequest; - final Completer completer; - final List _responseData = []; - - _JsonRPCRequest({required this.jsonRequest, required this.completer}); - - void appendDataAndCheckIfComplete(List data) { - _responseData.addAll(data); - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { - try { - final response = json.decode(String.fromCharCodes(_responseData)); - completer.complete(response); - } catch (e, s) { - Logging.instance.log( - "JsonRPC json.decode: $e\n$s", - level: LogLevel.Error, - ); - completer.completeError(e, s); - } - } - } - - void initiateTimeout(Duration timeout) { - Future.delayed(timeout).then((_) { - if (!isComplete) { - try { - throw Exception("_JsonRPCRequest timed out: $jsonRequest"); - } catch (e, s) { - completer.completeError(e, s); - } - } - }); - } - - bool get isComplete => completer.isCompleted; -}