rpc2.dart -> rpc.dart

This commit is contained in:
Josh Babb 2023-05-25 15:52:07 -05:00
parent 73312cb920
commit 69a085e9a6
6 changed files with 185 additions and 313 deletions

View file

@ -3,7 +3,7 @@ import 'dart:io';
import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:decimal/decimal.dart'; import 'package:decimal/decimal.dart';
import 'package:stackwallet/electrumx_rpc/rpc2.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart';
import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart'; import 'package:stackwallet/utilities/prefs.dart';

View file

@ -2,8 +2,8 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/logger.dart';
// hacky fix to receive large jsonrpc responses // hacky fix to receive large jsonrpc responses
@ -19,110 +19,204 @@ class JsonRPC {
final int port; final int port;
final Duration connectionTimeout; final Duration connectionTimeout;
Socket? socket; final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
Socket? _socket;
StreamSubscription<Uint8List>? _subscription; StreamSubscription<Uint8List>? _subscription;
final m = Mutex(); void _dataHandler(List<int> data) {
if (_requestQueue.isEmpty) {
// probably just return although this case should never actually hit
// TODO anything else here?
return;
}
void Function(List<int>)? _onData; final req = _requestQueue.next;
void Function(Object, StackTrace)? _onError; req.appendDataAndCheckIfComplete(data);
if (req.isComplete) {
_onReqCompleted(req);
}
}
void _errorHandler(Object error, StackTrace trace) {
Logging.instance.log(
"JsonRPC errorHandler: $error\n$trace",
level: LogLevel.Error,
);
final req = _requestQueue.next;
req.completer.completeError(error, trace);
_onReqCompleted(req);
}
void _doneHandler() {
Logging.instance.log(
"JsonRPC doneHandler: "
"connection closed to ${_socket?.address}:${_socket?.port}, destroying socket",
level: LogLevel.Info,
);
_socket?.destroy();
_socket = null; // is this redundant?
// should we also cancel and/or null the subscription?
if (_requestQueue.isNotEmpty) {
// TODO iterate over the remaining requests and if they are not isComplete then complete the completer with an error
Logging.instance.log(
"JsonRPC doneHandler: queue not empty but connection closed, completing pending requests with errors",
level: LogLevel.Error,
);
_errorPendingRequests();
}
}
Future<void> _onReqCompleted(_JsonRPCRequest req) async {
await _requestQueue.remove(req);
if (_requestQueue.isNotEmpty) {
_sendNextAvailableRequest();
}
}
void _sendNextAvailableRequest() {
if (_requestQueue.isEmpty) {
// TODO handle properly
throw Exception("JSON RPC queue empty");
}
final req = _requestQueue.next;
_socket!.write('${req.jsonRequest}\r\n');
Logging.instance.log(
"JsonRPC request: wrote request ${req.jsonRequest} "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
);
}
void _errorPendingRequests() {
if (_requestQueue.isNotEmpty) {
final req = _requestQueue.next;
if (!(req.isComplete)) {
req.completer.completeError('JsonRPC doneHandler: socket closed before request could complete');
_requestQueue.remove(req).then((ret) {
if (_requestQueue.isNotEmpty) {
_errorPendingRequests();
}
});
}
} else {
Logging.instance.log(
"JsonRPC _errorPendingRequests: done completing pending requests with errors",
level: LogLevel.Info,
);
}
}
Future<dynamic> request(String jsonRpcRequest) async { Future<dynamic> request(String jsonRpcRequest) async {
final completer = Completer<dynamic>(); // todo: handle this better?
final List<int> responseData = []; // Do we need to check the subscription, too?
if (_socket == null) {
void dataHandler(List<int> data) {
responseData.addAll(data);
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
if (data.last == 0x0A) {
try {
final response = json.decode(String.fromCharCodes(responseData));
completer.complete(response); // TODO only complete on last chunk?
} catch (e, s) {
Logging.instance
.log("JsonRPC json.decode: $e\n$s", level: LogLevel.Error);
completer.completeError(e, s);
} finally {
Logging.instance.log(
"JsonRPC dataHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy();
// TODO is this all we need to do?
}
}
}
_onData = dataHandler;
void errorHandler(Object error, StackTrace trace) {
Logging.instance
.log("JsonRPC errorHandler: $error\n$trace", level: LogLevel.Error);
completer.completeError(error, trace);
Logging.instance.log(
"JsonRPC errorHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy();
// TODO do we need to recreate the socket?
}
_onError = errorHandler;
void doneHandler() {
Logging.instance.log(
"JsonRPC doneHandler: not destroying socket ${socket?.address}:${socket?.port}",
level: LogLevel.Info,
);
// socket?.destroy();
m.release();
// TODO is this all we need?
}
if (socket != null) {
// TODO check if the socket is valid, alive, connected, etc
} else {
Logging.instance.log( Logging.instance.log(
"JsonRPC request: opening socket $host:$port", "JsonRPC request: opening socket $host:$port",
level: LogLevel.Info, level: LogLevel.Info,
); );
await connect();
} }
// Do we need to check the subscription, too?
await m.acquire(); final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
completer: Completer<dynamic>(),
);
if (useSSL) { await _requestQueue.add(req);
socket ??= await SecureSocket.connect(host, port,
timeout: connectionTimeout, onBadCertificate: (_) => true); // TODO do not automatically trust bad certificates // if this is the only/first request then send it right away
_subscription ??= socket!.listen( if (_requestQueue.length == 1) {
_onData, _sendNextAvailableRequest();
onError: _onError,
onDone: doneHandler,
cancelOnError: true,
);
} else { } else {
socket ??= await Socket.connect( Logging.instance.log(
"JsonRPC request: queued request $jsonRpcRequest "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
);
}
return req.completer.future;
}
Future<void> disconnect() async {
await _subscription?.cancel();
_subscription = null;
_socket?.destroy();
}
Future<void> connect() async {
if (useSSL) {
_socket ??= await SecureSocket.connect(
host,
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
} else {
_socket ??= await Socket.connect(
host, host,
port, port,
timeout: connectionTimeout, timeout: connectionTimeout,
); );
_subscription ??= socket!.listen(
_onData,
onError: _onError,
onDone: doneHandler,
cancelOnError: true,
);
} }
await _subscription?.cancel();
socket!.write('$jsonRpcRequest\r\n'); _subscription = _socket!.listen(
_dataHandler,
Logging.instance.log( onError: _errorHandler,
"JsonRPC request: wrote request $jsonRpcRequest to socket ${socket?.address}:${socket?.port}", onDone: _doneHandler,
level: LogLevel.Info, cancelOnError: true,
); );
return completer.future;
} }
} }
// mutex *may* not be needed as the protected functions are not async
class _JsonRPCRequestQueue {
final _m = Mutex();
final List<_JsonRPCRequest> _rq = [];
Future<void> add(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.add(req));
}
Future<void> remove(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.remove(req));
}
bool get isEmpty => _rq.isEmpty;
bool get isNotEmpty => _rq.isNotEmpty;
int get length => _rq.length;
_JsonRPCRequest get next => _rq.first;
}
class _JsonRPCRequest {
final String jsonRequest;
final Completer<dynamic> completer;
final List<int> _responseData = [];
_JsonRPCRequest({required this.jsonRequest, required this.completer});
void appendDataAndCheckIfComplete(List<int> data) {
_responseData.addAll(data);
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
if (data.last == 0x0A) {
try {
final response = json.decode(String.fromCharCodes(_responseData));
completer.complete(response);
} catch (e, s) {
Logging.instance.log(
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
);
completer.completeError(e, s);
}
}
}
bool get isComplete => completer.isCompleted;
}

View file

@ -1,222 +0,0 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/utilities/logger.dart';
// hacky fix to receive large jsonrpc responses
class JsonRPC {
JsonRPC({
required this.host,
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
});
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
Socket? _socket;
StreamSubscription<Uint8List>? _subscription;
void _dataHandler(List<int> data) {
if (_requestQueue.isEmpty) {
// probably just return although this case should never actually hit
// TODO anything else here?
return;
}
final req = _requestQueue.next;
req.appendDataAndCheckIfComplete(data);
if (req.isComplete) {
_onReqCompleted(req);
}
}
void _errorHandler(Object error, StackTrace trace) {
Logging.instance.log(
"JsonRPC errorHandler: $error\n$trace",
level: LogLevel.Error,
);
final req = _requestQueue.next;
req.completer.completeError(error, trace);
_onReqCompleted(req);
}
void _doneHandler() {
Logging.instance.log(
"JsonRPC doneHandler: "
"connection closed to ${_socket?.address}:${_socket?.port}, destroying socket",
level: LogLevel.Info,
);
_socket?.destroy();
_socket = null; // is this redundant?
// should we also cancel and/or null the subscription?
if (_requestQueue.isNotEmpty) {
// TODO iterate over the remaining requests and if they are not isComplete then complete the completer with an error
Logging.instance.log(
"JsonRPC doneHandler: queue not empty but connection closed, completing pending requests with errors",
level: LogLevel.Error,
);
_errorPendingRequests();
}
}
Future<void> _onReqCompleted(_JsonRPCRequest req) async {
await _requestQueue.remove(req);
if (_requestQueue.isNotEmpty) {
_sendNextAvailableRequest();
}
}
void _sendNextAvailableRequest() {
if (_requestQueue.isEmpty) {
// TODO handle properly
throw Exception("JSON RPC queue empty");
}
final req = _requestQueue.next;
_socket!.write('${req.jsonRequest}\r\n');
Logging.instance.log(
"JsonRPC request: wrote request ${req.jsonRequest} "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
);
}
void _errorPendingRequests() {
if (_requestQueue.isNotEmpty) {
final req = _requestQueue.next;
if (!(req.isComplete)) {
req.completer.completeError('JsonRPC doneHandler: socket closed before request could complete');
_requestQueue.remove(req).then((ret) {
if (_requestQueue.isNotEmpty) {
_errorPendingRequests();
}
});
}
} else {
Logging.instance.log(
"JsonRPC _errorPendingRequests: done completing pending requests with errors",
level: LogLevel.Info,
);
}
}
Future<dynamic> request(String jsonRpcRequest) async {
// todo: handle this better?
// Do we need to check the subscription, too?
if (_socket == null) {
Logging.instance.log(
"JsonRPC request: opening socket $host:$port",
level: LogLevel.Info,
);
await connect();
}
final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
completer: Completer<dynamic>(),
);
await _requestQueue.add(req);
// if this is the only/first request then send it right away
if (_requestQueue.length == 1) {
_sendNextAvailableRequest();
} else {
Logging.instance.log(
"JsonRPC request: queued request $jsonRpcRequest "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
);
}
return req.completer.future;
}
Future<void> disconnect() async {
await _subscription?.cancel();
_subscription = null;
_socket?.destroy();
}
Future<void> connect() async {
if (useSSL) {
_socket ??= await SecureSocket.connect(
host,
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
} else {
_socket ??= await Socket.connect(
host,
port,
timeout: connectionTimeout,
);
}
await _subscription?.cancel();
_subscription = _socket!.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
}
}
// mutex *may* not be needed as the protected functions are not async
class _JsonRPCRequestQueue {
final _m = Mutex();
final List<_JsonRPCRequest> _rq = [];
Future<void> add(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.add(req));
}
Future<void> remove(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.remove(req));
}
bool get isEmpty => _rq.isEmpty;
bool get isNotEmpty => _rq.isNotEmpty;
int get length => _rq.length;
_JsonRPCRequest get next => _rq.first;
}
class _JsonRPCRequest {
final String jsonRequest;
final Completer<dynamic> completer;
final List<int> _responseData = [];
_JsonRPCRequest({required this.jsonRequest, required this.completer});
void appendDataAndCheckIfComplete(List<int> data) {
_responseData.addAll(data);
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
if (data.last == 0x0A) {
try {
final response = json.decode(String.fromCharCodes(_responseData));
completer.complete(response);
} catch (e, s) {
Logging.instance.log(
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
);
completer.completeError(e, s);
}
}
}
bool get isComplete => completer.isCompleted;
}

View file

@ -2,7 +2,7 @@ import 'package:flutter_test/flutter_test.dart';
import 'package:mockito/annotations.dart'; import 'package:mockito/annotations.dart';
import 'package:mockito/mockito.dart'; import 'package:mockito/mockito.dart';
import 'package:stackwallet/electrumx_rpc/electrumx.dart'; import 'package:stackwallet/electrumx_rpc/electrumx.dart';
import 'package:stackwallet/electrumx_rpc/rpc2.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/utilities/prefs.dart'; import 'package:stackwallet/utilities/prefs.dart';
import 'electrumx_test.mocks.dart'; import 'electrumx_test.mocks.dart';

View file

@ -7,7 +7,7 @@ import 'dart:async' as _i3;
import 'dart:ui' as _i7; import 'dart:ui' as _i7;
import 'package:mockito/mockito.dart' as _i1; import 'package:mockito/mockito.dart' as _i1;
import 'package:stackwallet/electrumx_rpc/rpc2.dart' as _i2; import 'package:stackwallet/electrumx_rpc/rpc.dart' as _i2;
import 'package:stackwallet/utilities/enums/backup_frequency_type.dart' as _i6; import 'package:stackwallet/utilities/enums/backup_frequency_type.dart' as _i6;
import 'package:stackwallet/utilities/enums/sync_type_enum.dart' as _i5; import 'package:stackwallet/utilities/enums/sync_type_enum.dart' as _i5;
import 'package:stackwallet/utilities/prefs.dart' as _i4; import 'package:stackwallet/utilities/prefs.dart' as _i4;

View file

@ -1,7 +1,7 @@
import 'dart:io'; import 'dart:io';
import 'package:flutter_test/flutter_test.dart'; import 'package:flutter_test/flutter_test.dart';
import 'package:stackwallet/electrumx_rpc/rpc2.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/utilities/default_nodes.dart'; import 'package:stackwallet/utilities/default_nodes.dart';
void main() { void main() {