stack_wallet/lib/electrumx_rpc/rpc.dart

414 lines
11 KiB
Dart
Raw Normal View History

2023-05-26 21:21:16 +00:00
/*
* This file is part of Stack Wallet.
*
* Copyright (c) 2023 Cypher Stack
* All Rights Reserved.
* The code is distributed under GPLv3 license, see LICENSE file for details.
* Generated by Cypher Stack on 2023-05-26
*
*/
2022-08-26 08:11:35 +00:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
2023-05-29 15:16:25 +00:00
import 'package:flutter/foundation.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart';
2022-08-26 08:11:35 +00:00
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart';
2023-09-20 04:46:46 +00:00
import 'package:tor_ffi_plugin/socks_socket.dart';
2022-08-26 08:11:35 +00:00
2023-05-29 15:16:25 +00:00
// Json RPC class to handle connecting to electrumx servers
2022-08-26 08:11:35 +00:00
class JsonRPC {
JsonRPC({
required this.host,
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
required ({InternetAddress host, int port})? proxyInfo,
2022-08-26 08:11:35 +00:00
});
2023-05-24 18:12:54 +00:00
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
({InternetAddress host, int port})? proxyInfo;
2022-08-26 08:11:35 +00:00
final _requestMutex = Mutex();
2023-05-25 20:52:07 +00:00
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
2023-08-09 17:50:12 +00:00
Socket? _socket;
SOCKSSocket? _socksSocket;
2023-09-06 22:48:50 +00:00
StreamSubscription<List<int>>? _subscription;
2023-05-24 18:27:19 +00:00
2023-05-25 20:52:07 +00:00
void _dataHandler(List<int> data) {
2023-05-29 15:16:25 +00:00
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
req.appendDataAndCheckIfComplete(data);
2023-05-29 15:16:25 +00:00
if (req.isComplete) {
_onReqCompleted(req);
2022-08-26 08:11:35 +00:00
}
2023-05-29 15:16:25 +00:00
} else {
Logging.instance.log(
"_dataHandler found a null req!",
level: LogLevel.Warning,
);
2022-08-26 08:11:35 +00:00
}
2023-05-29 15:16:25 +00:00
});
2023-05-25 20:52:07 +00:00
}
2022-08-26 08:11:35 +00:00
2023-05-25 20:52:07 +00:00
void _errorHandler(Object error, StackTrace trace) {
2023-05-29 15:16:25 +00:00
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
req.completer.completeError(error, trace);
_onReqCompleted(req);
}
});
2023-05-25 20:52:07 +00:00
}
2022-08-26 08:11:35 +00:00
2023-05-25 20:52:07 +00:00
void _doneHandler() {
2023-05-29 15:16:25 +00:00
disconnect(reason: "JsonRPC _doneHandler() called");
2023-05-25 20:52:07 +00:00
}
2022-08-26 08:11:35 +00:00
void _onReqCompleted(_JsonRPCRequest req) {
2023-05-29 15:16:25 +00:00
_requestQueue.remove(req).then((_) {
// attempt to send next request
2023-05-25 20:52:07 +00:00
_sendNextAvailableRequest();
2023-05-29 15:16:25 +00:00
});
2023-05-25 20:52:07 +00:00
}
void _sendNextAvailableRequest() {
2023-05-29 15:16:25 +00:00
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
if (!Prefs.instance.useTor) {
if (_socket == null) {
Logging.instance.log(
"JsonRPC _sendNextAvailableRequest attempted with"
" _socket=null on $host:$port",
level: LogLevel.Error,
);
}
// \r\n required by electrumx server
_socket!.write('${req.jsonRequest}\r\n');
} else {
if (_socksSocket == null) {
Logging.instance.log(
"JsonRPC _sendNextAvailableRequest attempted with"
" _socksSocket=null on $host:$port",
level: LogLevel.Error,
);
}
// \r\n required by electrumx server
_socksSocket?.write('${req.jsonRequest}\r\n');
}
2023-05-29 15:16:25 +00:00
// TODO different timeout length?
req.initiateTimeout(
onTimedOut: () {
_onReqCompleted(req);
2023-05-29 15:16:25 +00:00
},
);
}
});
2023-05-25 20:52:07 +00:00
}
2023-07-27 20:39:48 +00:00
Future<JsonRPCResponse> request(
String jsonRpcRequest,
Duration requestTimeout,
) async {
await _requestMutex.protect(() async {
if (!Prefs.instance.useTor) {
if (_socket == null) {
Logging.instance.log(
"JsonRPC request: opening socket $host:$port",
level: LogLevel.Info,
);
await _connect().timeout(requestTimeout, onTimeout: () {
2024-01-05 18:59:23 +00:00
throw Exception("Request timeout: $jsonRpcRequest");
});
}
} else {
if (_socksSocket == null) {
Logging.instance.log(
"JsonRPC request: opening SOCKS socket to $host:$port",
level: LogLevel.Info,
);
await _connect().timeout(requestTimeout, onTimeout: () {
2024-01-05 18:59:23 +00:00
throw Exception("Request timeout: $jsonRpcRequest");
});
}
}
});
2023-05-24 20:55:24 +00:00
2023-05-25 20:52:07 +00:00
final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
2023-07-27 20:39:48 +00:00
requestTimeout: requestTimeout,
2023-05-29 15:16:25 +00:00
completer: Completer<JsonRPCResponse>(),
2023-05-25 20:52:07 +00:00
);
2023-05-29 15:16:25 +00:00
final future = req.completer.future.onError(
(error, stackTrace) async {
await disconnect(
reason: "return req.completer.future.onError: $error\n$stackTrace",
);
return JsonRPCResponse(
exception: error is JsonRpcException
2023-05-29 15:16:25 +00:00
? error
: JsonRpcException(
2023-05-29 15:16:25 +00:00
"req.completer.future.onError: $error\n$stackTrace",
),
);
},
);
2023-05-24 18:15:10 +00:00
2023-05-25 20:52:07 +00:00
// if this is the only/first request then send it right away
2023-05-29 15:16:25 +00:00
await _requestQueue.add(
req,
onInitialRequestAdded: _sendNextAvailableRequest,
2023-05-26 21:28:25 +00:00
);
2023-05-29 15:16:25 +00:00
return future;
2023-05-25 20:52:07 +00:00
}
/// DO NOT set [ignoreMutex] to true unless fully aware of the consequences
Future<void> disconnect({
required String reason,
bool ignoreMutex = false,
}) async {
if (ignoreMutex) {
await _disconnectHelper(reason: reason);
} else {
await _requestMutex.protect(() async {
await _disconnectHelper(reason: reason);
});
}
}
Future<void> _disconnectHelper({required String reason}) async {
await _subscription?.cancel();
_subscription = null;
_socket?.destroy();
_socket = null;
await _socksSocket?.close();
_socksSocket = null;
// clean up remaining queue
await _requestQueue.completeRemainingWithError(
"JsonRPC disconnect() called with reason: \"$reason\"",
);
2023-05-25 20:52:07 +00:00
}
Future<void> _connect() async {
// ignore mutex is set to true here as _connect is already called within
// the mutex.protect block. Setting to false here leads to a deadlock
await disconnect(
reason: "New connection requested",
ignoreMutex: true,
);
if (!Prefs.instance.useTor) {
if (useSSL) {
_socket = await SecureSocket.connect(
host,
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
} else {
_socket = await Socket.connect(
host,
port,
timeout: connectionTimeout,
);
}
_subscription = _socket!.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
} else {
if (proxyInfo == null) {
throw JsonRpcException(
"JsonRPC.connect failed with useTor=${Prefs.instance.useTor} and proxyInfo is null");
}
2023-09-06 22:27:59 +00:00
// instantiate a socks socket at localhost and on the port selected by the tor service
_socksSocket = await SOCKSSocket.create(
proxyHost: proxyInfo!.host.address,
proxyPort: proxyInfo!.port,
2023-09-06 22:27:59 +00:00
sslEnabled: useSSL,
);
try {
Logging.instance.log(
2023-09-06 22:27:59 +00:00
"JsonRPC.connect(): connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...",
level: LogLevel.Info);
await _socksSocket?.connect();
Logging.instance.log(
"JsonRPC.connect(): connected to SOCKS socket at $proxyInfo...",
level: LogLevel.Info);
} catch (e) {
Logging.instance.log(
"JsonRPC.connect(): failed to connect to SOCKS socket at $proxyInfo, $e",
level: LogLevel.Error);
throw JsonRpcException(
"JsonRPC.connect(): failed to connect to SOCKS socket at $proxyInfo, $e");
}
try {
Logging.instance.log(
"JsonRPC.connect(): connecting to $host:$port over SOCKS socket at $proxyInfo...",
level: LogLevel.Info);
await _socksSocket?.connectTo(host, port);
Logging.instance.log(
"JsonRPC.connect(): connected to $host:$port over SOCKS socket at $proxyInfo",
level: LogLevel.Info);
} catch (e) {
Logging.instance.log(
"JsonRPC.connect(): failed to connect to $host over tor proxy at $proxyInfo, $e",
level: LogLevel.Error);
throw JsonRpcException(
"JsonRPC.connect(): failed to connect to tor proxy, $e");
}
2023-09-06 22:48:50 +00:00
_subscription = _socksSocket!.listen(
2023-09-06 22:27:59 +00:00
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
}
2023-09-20 04:51:41 +00:00
return;
}
2023-05-25 20:52:07 +00:00
}
2022-08-26 08:11:35 +00:00
2023-05-25 20:52:07 +00:00
class _JsonRPCRequestQueue {
2023-05-29 15:16:25 +00:00
final _lock = Mutex();
2023-05-25 20:52:07 +00:00
final List<_JsonRPCRequest> _rq = [];
2022-08-26 08:11:35 +00:00
2023-05-29 15:16:25 +00:00
Future<void> add(
_JsonRPCRequest req, {
VoidCallback? onInitialRequestAdded,
}) async {
return await _lock.protect(() async {
_rq.add(req);
if (_rq.length == 1) {
onInitialRequestAdded?.call();
}
});
}
2022-08-26 08:11:35 +00:00
2023-05-29 15:16:25 +00:00
Future<bool> remove(_JsonRPCRequest req) async {
return await _lock.protect(() async {
final result = _rq.remove(req);
return result;
});
2022-08-26 08:11:35 +00:00
}
2023-05-29 15:16:25 +00:00
Future<_JsonRPCRequest?> get nextIncompleteReq async {
return await _lock.protect(() async {
int removeCount = 0;
_JsonRPCRequest? returnValue;
for (final req in _rq) {
if (req.isComplete) {
removeCount++;
} else {
returnValue = req;
break;
}
}
2023-05-25 20:52:07 +00:00
2023-05-29 15:16:25 +00:00
_rq.removeRange(0, removeCount);
2023-05-25 21:29:23 +00:00
2023-05-29 15:16:25 +00:00
return returnValue;
});
}
2023-05-25 21:29:23 +00:00
2023-05-29 15:16:25 +00:00
Future<void> completeRemainingWithError(
String error, {
StackTrace? stackTrace,
}) async {
await _lock.protect(() async {
for (final req in _rq) {
if (!req.isComplete) {
req.completer.completeError(Exception(error), stackTrace);
}
}
_rq.clear();
});
}
Future<bool> get isEmpty async {
return await _lock.protect(() async {
return _rq.isEmpty;
});
}
2023-05-25 20:52:07 +00:00
}
2023-05-24 20:55:55 +00:00
2023-05-25 20:52:07 +00:00
class _JsonRPCRequest {
2023-05-29 15:16:25 +00:00
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
static const int separatorByte = 0x0A;
2023-05-25 20:52:07 +00:00
final String jsonRequest;
2023-05-29 15:16:25 +00:00
final Completer<JsonRPCResponse> completer;
2023-07-27 20:39:48 +00:00
final Duration requestTimeout;
2023-05-25 20:52:07 +00:00
final List<int> _responseData = [];
2023-07-27 20:39:48 +00:00
_JsonRPCRequest({
required this.jsonRequest,
required this.completer,
required this.requestTimeout,
});
2023-05-25 20:52:07 +00:00
void appendDataAndCheckIfComplete(List<int> data) {
_responseData.addAll(data);
2023-05-29 15:16:25 +00:00
if (data.last == separatorByte) {
2023-05-25 20:52:07 +00:00
try {
final response = json.decode(String.fromCharCodes(_responseData));
2023-05-29 15:16:25 +00:00
completer.complete(JsonRPCResponse(data: response));
2023-05-25 20:52:07 +00:00
} catch (e, s) {
Logging.instance.log(
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
);
completer.completeError(e, s);
}
}
2022-08-26 08:11:35 +00:00
}
2023-05-25 20:52:07 +00:00
2023-07-27 20:39:48 +00:00
void initiateTimeout({
required VoidCallback onTimedOut,
2023-05-29 15:16:25 +00:00
}) {
2023-07-27 20:39:48 +00:00
Future<void>.delayed(requestTimeout).then((_) {
2023-05-26 21:34:09 +00:00
if (!isComplete) {
completer.complete(
JsonRPCResponse(
data: null,
exception: JsonRpcException(
"_JsonRPCRequest timed out: $jsonRequest",
),
),
);
2023-05-26 21:34:09 +00:00
}
onTimedOut.call();
2023-05-26 21:34:09 +00:00
});
}
2023-05-25 20:52:07 +00:00
bool get isComplete => completer.isCompleted;
2022-08-26 08:11:35 +00:00
}
2023-05-29 15:16:25 +00:00
class JsonRPCResponse {
final dynamic data;
final JsonRpcException? exception;
2023-05-29 15:16:25 +00:00
JsonRPCResponse({this.data, this.exception});
2022-08-26 08:11:35 +00:00
}