diff --git a/lib/electrumx_rpc/rpc.dart b/lib/electrumx_rpc/rpc.dart index af1952acf..bbc71442b 100644 --- a/lib/electrumx_rpc/rpc.dart +++ b/lib/electrumx_rpc/rpc.dart @@ -26,7 +26,6 @@ class JsonRPC { void _dataHandler(List data) { if (_requestQueue.isEmpty) { // probably just return although this case should never actually hit - // TODO anything else here? return; } @@ -52,20 +51,23 @@ class JsonRPC { void _doneHandler() { Logging.instance.log( "JsonRPC doneHandler: " - "connection closed to ${_socket?.address}:${_socket?.port}, destroying socket", + "connection closed to ${_socket?.address}:$port, destroying socket", level: LogLevel.Info, ); - _socket?.destroy(); - _socket = null; // is this redundant? - // should we also cancel and/or null the subscription? if (_requestQueue.isNotEmpty) { Logging.instance.log( - "JsonRPC doneHandler: queue not empty but connection closed, completing pending requests with errors", + "JsonRPC doneHandler: queue not empty but connection closed, " + "completing pending requests with errors", level: LogLevel.Error, ); - _errorPendingRequests(); + _requestQueue.clear( + errorMessage: "JsonRPC doneHandler: socket closed " + "before request could complete", + ); } + + disconnect(); } Future _onReqCompleted(_JsonRPCRequest req) async { @@ -91,25 +93,6 @@ class JsonRPC { ); } - void _errorPendingRequests() { - if (_requestQueue.isNotEmpty) { - final req = _requestQueue.next; - if (!(req.isComplete)) { - req.completer.completeError('JsonRPC doneHandler: socket closed before request could complete'); - _requestQueue.remove(req).then((ret) { - if (_requestQueue.isNotEmpty) { - _errorPendingRequests(); - } - }); - } - } else { - Logging.instance.log( - "JsonRPC _errorPendingRequests: done completing pending requests with errors", - level: LogLevel.Info, - ); - } - } - Future request(String jsonRpcRequest) async { // todo: handle this better? // Do we need to check the subscription, too? @@ -134,7 +117,7 @@ class JsonRPC { } else { Logging.instance.log( "JsonRPC request: queued request $jsonRpcRequest " - "to socket ${_socket?.address}:${_socket?.port}", + "to socket ${_socket?.address}:$port", level: LogLevel.Info, ); } @@ -142,10 +125,10 @@ class JsonRPC { return req.completer.future; } - Future disconnect() async { - await _subscription?.cancel(); - _subscription = null; + void disconnect() { + _subscription?.cancel().then((_) => _subscription = null); _socket?.destroy(); + _socket = null; } Future connect() async { @@ -186,6 +169,18 @@ class _JsonRPCRequestQueue { await _m.protect(() async => _rq.remove(req)); } + Future clear({required String errorMessage}) async { + await _m.protect(() async { + for (final req in _rq) { + if (!req.isComplete) { + req.completer.completeError(errorMessage); + } + } + + _rq.clear(); + }); + } + bool get isEmpty => _rq.isEmpty; bool get isNotEmpty => _rq.isNotEmpty; int get length => _rq.length;