stack_wallet/lib/electrumx_rpc/rpc.dart

233 lines
5.8 KiB
Dart
Raw Normal View History

2022-08-26 08:11:35 +00:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
2023-05-24 18:27:19 +00:00
import 'dart:typed_data';
2022-08-26 08:11:35 +00:00
import 'package:mutex/mutex.dart';
2022-08-26 08:11:35 +00:00
import 'package:stackwallet/utilities/logger.dart';
// hacky fix to receive large jsonrpc responses
class JsonRPC {
JsonRPC({
required this.host,
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
});
2023-05-24 18:12:54 +00:00
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
2022-08-26 08:11:35 +00:00
final _requestMutex = Mutex();
2023-05-25 20:52:07 +00:00
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
Socket? _socket;
2023-05-24 18:27:19 +00:00
StreamSubscription<Uint8List>? _subscription;
2023-05-25 20:52:07 +00:00
void _dataHandler(List<int> data) {
if (_requestQueue.isEmpty) {
// probably just return although this case should never actually hit
return;
}
2023-05-24 20:55:24 +00:00
2023-05-25 20:52:07 +00:00
final req = _requestQueue.next;
req.appendDataAndCheckIfComplete(data);
2023-05-25 20:52:07 +00:00
if (req.isComplete) {
_onReqCompleted(req);
2022-08-26 08:11:35 +00:00
}
2023-05-25 20:52:07 +00:00
}
void _errorHandler(Object error, StackTrace trace) {
Logging.instance.log(
"JsonRPC errorHandler: $error\n$trace",
level: LogLevel.Error,
);
final req = _requestQueue.next;
req.completer.completeError(error, trace);
_onReqCompleted(req);
}
2022-08-26 08:11:35 +00:00
2023-05-25 20:52:07 +00:00
void _doneHandler() {
Logging.instance.log(
"JsonRPC doneHandler: "
"connection closed to $host:$port, destroying socket",
2023-05-25 20:52:07 +00:00
level: LogLevel.Info,
);
2023-05-25 20:52:07 +00:00
if (_requestQueue.isNotEmpty) {
2023-05-24 18:27:19 +00:00
Logging.instance.log(
2023-05-25 21:29:23 +00:00
"JsonRPC doneHandler: queue not empty but connection closed, "
"completing pending requests with errors",
2023-05-25 20:52:07 +00:00
level: LogLevel.Error,
2023-05-24 18:27:19 +00:00
);
for (final req in _requestQueue.queue) {
if (!req.isComplete) {
2023-05-26 18:48:10 +00:00
try {
throw Exception(
"JsonRPC doneHandler: socket closed "
"before request could complete",
);
} catch (e, s) {
req.completer.completeError(e, s);
}
}
}
_requestQueue.clear();
2023-05-25 20:52:07 +00:00
}
2023-05-25 21:29:23 +00:00
disconnect();
2023-05-25 20:52:07 +00:00
}
void _onReqCompleted(_JsonRPCRequest req) {
_requestQueue.remove(req);
2023-05-25 20:52:07 +00:00
if (_requestQueue.isNotEmpty) {
_sendNextAvailableRequest();
}
}
void _sendNextAvailableRequest() {
if (_requestQueue.isEmpty) {
// TODO handle properly
throw Exception("JSON RPC queue empty");
2022-08-26 08:11:35 +00:00
}
2023-05-25 20:52:07 +00:00
final req = _requestQueue.next;
_socket!.write('${req.jsonRequest}\r\n');
2023-05-26 21:34:09 +00:00
req.initiateTimeout(const Duration(seconds: 10));
2023-05-25 21:55:57 +00:00
// Logging.instance.log(
// "JsonRPC request: wrote request ${req.jsonRequest} "
// "to socket $host:$port",
2023-05-25 21:55:57 +00:00
// level: LogLevel.Info,
// );
2023-05-25 20:52:07 +00:00
}
2023-05-25 20:52:07 +00:00
Future<dynamic> request(String jsonRpcRequest) async {
// todo: handle this better?
// Do we need to check the subscription, too?
await _requestMutex.protect(() async {
if (_socket == null) {
Logging.instance.log(
"JsonRPC request: opening socket $host:$port",
level: LogLevel.Info,
);
await connect();
}
});
2023-05-24 20:55:24 +00:00
2023-05-25 20:52:07 +00:00
final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
completer: Completer<dynamic>(),
);
_requestQueue.add(req);
2023-05-24 18:15:10 +00:00
2023-05-25 20:52:07 +00:00
// if this is the only/first request then send it right away
if (_requestQueue.length == 1) {
_sendNextAvailableRequest();
} else {
2023-05-25 21:55:57 +00:00
// Logging.instance.log(
// "JsonRPC request: queued request $jsonRpcRequest "
// "to socket $host:$port",
2023-05-25 21:55:57 +00:00
// level: LogLevel.Info,
// );
2023-05-25 20:52:07 +00:00
}
2023-05-26 21:28:25 +00:00
return req.completer.future.onError(
(error, stackTrace) =>
Exception("return req.completer.future.onError: $error"),
);
2023-05-25 20:52:07 +00:00
}
2023-05-25 21:29:23 +00:00
void disconnect() {
// TODO: maybe clear req queue here and wrap in mutex?
2023-05-25 21:29:23 +00:00
_subscription?.cancel().then((_) => _subscription = null);
2023-05-25 20:52:07 +00:00
_socket?.destroy();
2023-05-25 21:29:23 +00:00
_socket = null;
2023-05-25 20:52:07 +00:00
}
Future<void> connect() async {
if (useSSL) {
_socket ??= await SecureSocket.connect(
host,
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
2022-08-26 08:11:35 +00:00
} else {
2023-05-25 20:52:07 +00:00
_socket ??= await Socket.connect(
2023-05-24 18:27:19 +00:00
host,
port,
timeout: connectionTimeout,
);
2022-08-26 08:11:35 +00:00
}
2023-05-25 20:52:07 +00:00
await _subscription?.cancel();
_subscription = _socket!.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
}
}
2022-08-26 08:11:35 +00:00
2023-05-25 20:52:07 +00:00
class _JsonRPCRequestQueue {
final List<_JsonRPCRequest> _rq = [];
2022-08-26 08:11:35 +00:00
void add(_JsonRPCRequest req) => _rq.add(req);
2023-05-25 20:52:07 +00:00
bool remove(_JsonRPCRequest req) => _rq.remove(req);
2023-05-25 21:29:23 +00:00
void clear() => _rq.clear();
2023-05-25 21:29:23 +00:00
2023-05-25 20:52:07 +00:00
bool get isEmpty => _rq.isEmpty;
bool get isNotEmpty => _rq.isNotEmpty;
int get length => _rq.length;
_JsonRPCRequest get next => _rq.first;
List<_JsonRPCRequest> get queue => _rq.toList(growable: false);
2023-05-25 20:52:07 +00:00
}
2023-05-24 20:55:55 +00:00
2023-05-25 20:52:07 +00:00
class _JsonRPCRequest {
final String jsonRequest;
final Completer<dynamic> completer;
final List<int> _responseData = [];
_JsonRPCRequest({required this.jsonRequest, required this.completer});
void appendDataAndCheckIfComplete(List<int> data) {
_responseData.addAll(data);
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
if (data.last == 0x0A) {
try {
final response = json.decode(String.fromCharCodes(_responseData));
completer.complete(response);
} catch (e, s) {
Logging.instance.log(
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
);
completer.completeError(e, s);
}
}
2022-08-26 08:11:35 +00:00
}
2023-05-25 20:52:07 +00:00
2023-05-26 21:34:09 +00:00
void initiateTimeout(Duration timeout) {
Future<void>.delayed(timeout).then((_) {
if (!isComplete) {
try {
throw Exception("_JsonRPCRequest timed out: $jsonRequest");
} catch (e, s) {
completer.completeError(e, s);
}
}
});
}
2023-05-25 20:52:07 +00:00
bool get isComplete => completer.isCompleted;
2022-08-26 08:11:35 +00:00
}