stack_wallet/lib/electrumx_rpc/rpc.dart
sneurlax 8492773438 WIP very messy
need to remove SocksSocket as it apparently doesn't work, whereas SocksProxy does
2023-08-09 18:28:19 -05:00

494 lines
14 KiB
Dart

/*
* This file is part of Stack Wallet.
*
* Copyright (c) 2023 Cypher Stack
* All Rights Reserved.
* The code is distributed under GPLv3 license, see LICENSE file for details.
* Generated by Cypher Stack on 2023-05-26
*
*/
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/networking/socks5.dart';
import 'package:stackwallet/networking/tor_service.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart';
// Json RPC class to handle connecting to electrumx servers
class JsonRPC {
JsonRPC({
required this.host,
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
required ({String host, int port})? proxyInfo,
});
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
({String host, int port})? proxyInfo;
final _requestMutex = Mutex();
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
Socket? _socket;
SOCKSSocket? _socksSocket;
SOCKS5Proxy? _socksProxy;
StreamSubscription<Uint8List>? _subscription;
void _dataHandler(List<int> data) {
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
req.appendDataAndCheckIfComplete(data);
if (req.isComplete) {
_onReqCompleted(req);
}
} else {
Logging.instance.log(
"_dataHandler found a null req!",
level: LogLevel.Warning,
);
}
});
}
void _errorHandler(Object error, StackTrace trace) {
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
req.completer.completeError(error, trace);
_onReqCompleted(req);
}
});
}
void _doneHandler() {
disconnect(reason: "JsonRPC _doneHandler() called");
}
void _onReqCompleted(_JsonRPCRequest req) {
_requestQueue.remove(req).then((_) {
// attempt to send next request
_sendNextAvailableRequest();
});
}
void _sendNextAvailableRequest() {
_requestQueue.nextIncompleteReq.then((req) {
if (req != null) {
// \r\n required by electrumx server
if (_socket != null) {
_socket!.write('${req.jsonRequest}\r\n');
}
if (_socksSocket != null) {
print('writing to _socksSocket: ${req.jsonRequest}');
print(req.jsonRequest);
print(req);
_socksSocket!.write('${req.jsonRequest}\r\n');
}
if (_socksProxy != null) {
print('writing to _socksProxy: ${req.jsonRequest}');
print(req.jsonRequest);
print(req);
_socksProxy!.write('${req.jsonRequest}\r\n');
}
// TODO different timeout length?
req.initiateTimeout(
onTimedOut: () {
_requestQueue.remove(req);
},
);
}
});
}
Future<JsonRPCResponse> request(
String jsonRpcRequest,
Duration requestTimeout,
) async {
await _requestMutex.protect(() async {
if (!Prefs.instance.useTor) {
if (_socket == null) {
Logging.instance.log(
"JsonRPC request: opening socket $host:$port",
level: LogLevel.Info,
);
// await connect();
}
} else {
// if (_socksSocket == null) {
// Logging.instance.log(
// "JsonRPC request: opening SOCKS socket to $host:$port",
// level: LogLevel.Info,
// );
// await connect();
// }
if (_socksProxy == null) {
Logging.instance.log(
"JsonRPC request: opening SOCKS proxy to $host:$port",
level: LogLevel.Info,
);
await connect();
}
}
});
final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
requestTimeout: requestTimeout,
completer: Completer<JsonRPCResponse>(),
);
final future = req.completer.future.onError(
(error, stackTrace) async {
await disconnect(
reason: "return req.completer.future.onError: $error\n$stackTrace",
);
return JsonRPCResponse(
exception: error is Exception
? error
: Exception(
"req.completer.future.onError: $error\n$stackTrace",
),
);
},
);
// if this is the only/first request then send it right away
await _requestQueue.add(
req,
onInitialRequestAdded: _sendNextAvailableRequest,
);
return future;
}
Future<void> disconnect({required String reason}) async {
await _requestMutex.protect(() async {
await _subscription?.cancel();
_subscription = null;
_socket?.destroy();
_socket = null;
unawaited(_socksSocket?.close(keepOpen: false));
// TODO check that it's ok to not await this
_socksSocket = null;
// clean up remaining queue
await _requestQueue.completeRemainingWithError(
"JsonRPC disconnect() called with reason: \"$reason\"",
);
});
}
Future<void> connect() async {
if (!Prefs.instance.useTor) {
if (useSSL) {
_socket = await SecureSocket.connect(
host,
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
} else {
_socket = await Socket.connect(
host,
port,
timeout: connectionTimeout,
);
}
_subscription = _socket!.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
} else {
if (_socksProxy == null) {
print(1111111);
_socksProxy = SOCKS5Proxy();
// TODO check if null
await _socksProxy!.connect();
print(222222);
// TODO check if null
await _socksProxy!.connectTo('bitcoincash.stackwallet.com', 50002);
print(333333);
// TODO check if null
_subscription = _socksProxy!.socket.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
} else {
print('0000000');
}
// if (proxyInfo == null) {
// // TODO await tor / make sure it's running
// proxyInfo = (
// host: InternetAddress.loopbackIPv4.address,
// port: TorService.sharedInstance.port
// );
// Logging.instance.log(
// "ElectrumX.connect(): no tor proxy info, read $proxyInfo",
// level: LogLevel.Warning);
// }
// // TODO connect to proxy socket...
//
// // TODO implement ssl over tor
// // if (useSSL) {
// // _socket = await SecureSocket.connect(
// // host,
// // port,
// // timeout: connectionTimeout,
// // onBadCertificate: (_) => true,
// // ); // TODO do not automatically trust bad certificates
// // final _client = SocksSocket.protected(_socket, type);
// // } else {
// final sock = await RawSocket.connect(
// InternetAddress.loopbackIPv4, proxyInfo!.port);
//
// if (_socksSocket == null) {
// Logging.instance.log(
// "JsonRPC.connect(): creating SOCKS socket at $proxyInfo",
// level: LogLevel.Info);
// _socksSocket = SOCKSSocket(sock);
// if (_socksSocket == null) {
// Logging.instance.log(
// "JsonRPC.connect(): failed to create SOCKS socket at $proxyInfo",
// level: LogLevel.Error);
// throw Exception(
// "JsonRPC.connect(): failed to create SOCKS socket at $proxyInfo");
// } else {
// Logging.instance.log(
// "JsonRPC.connect(): created SOCKS socket at $proxyInfo",
// level: LogLevel.Info);
// }
// } else {
// // TODO also check if sock == previous sock, eg. if RawSocket is different
// Logging.instance.log(
// "JsonRPC.connect(): using pre-existing SOCKS socket at $proxyInfo",
// level: LogLevel.Info);
// }
//
// try {
// Logging.instance.log(
// "JsonRPC.connect(): connecting to $host:$port over SOCKS socket at $proxyInfo...",
// level: LogLevel.Info);
// if (!isIpAddress(host)) {
// await _socksSocket!.connect("$host:$port");
// } else {
// await _socksSocket!.connectIp(InternetAddress(host), port);
// }
// Logging.instance.log(
// "JsonRPC.connect(): connected to $host:$port over SOCKS socket at $proxyInfo",
// level: LogLevel.Info);
// } catch (e) {
// Logging.instance.log(
// "JsonRPC.connect(): failed to connect to $host over tor proxy at $proxyInfo, $e",
// level: LogLevel.Error);
// throw Exception(
// "JsonRPC.connect(): failed to connect to tor proxy, $e");
// }
}
}
}
class SOCKS5Proxy {
final String host;
final int port;
late Socket _socks5Socket;
Socket get socket => _socks5Socket;
SOCKS5Proxy({String? host, int? port})
: host = host ?? InternetAddress.loopbackIPv4.address,
port = port ?? TorService.sharedInstance.port;
Future<void> connect() async {
_socks5Socket = await Socket.connect(host, port);
// Greeting and method selection
_socks5Socket.add([0x05, 0x01, 0x00]);
// Wait for server response
var response = await _socks5Socket.first;
if (response[1] != 0x00) {
throw Exception('Failed to connect to SOCKS5 proxy.');
}
}
// This is just a basic example for domain-based addresses.
Future<void> connectTo(String domain, int port) async {
// Command, Reserved, Address Type, Address, Port
var request = [
0x05,
0x01,
0x00,
0x03,
domain.length,
...domain.codeUnits,
(port >> 8) & 0xFF,
port & 0xFF
];
_socks5Socket.add(request);
print(444444);
// Wait for server response
// var response = await _socks5Socket.first;
// if (response[1] != 0x00) {
// throw Exception('Failed to connect to target through SOCKS5 proxy.');
// }
// print(response);
print(55555);
}
/// Converts [object] to a String by invoking [Object.toString] and
/// sends the encoding of the result to the socket.
void write(Object? object) {
if (object == null) return;
List<int> data = utf8.encode(object.toString());
_socks5Socket.add(data);
}
}
class _JsonRPCRequestQueue {
final _lock = Mutex();
final List<_JsonRPCRequest> _rq = [];
Future<void> add(
_JsonRPCRequest req, {
VoidCallback? onInitialRequestAdded,
}) async {
return await _lock.protect(() async {
_rq.add(req);
if (_rq.length == 1) {
onInitialRequestAdded?.call();
}
});
}
Future<bool> remove(_JsonRPCRequest req) async {
return await _lock.protect(() async {
final result = _rq.remove(req);
return result;
});
}
Future<_JsonRPCRequest?> get nextIncompleteReq async {
return await _lock.protect(() async {
int removeCount = 0;
_JsonRPCRequest? returnValue;
for (final req in _rq) {
if (req.isComplete) {
removeCount++;
} else {
returnValue = req;
break;
}
}
_rq.removeRange(0, removeCount);
return returnValue;
});
}
Future<void> completeRemainingWithError(
String error, {
StackTrace? stackTrace,
}) async {
await _lock.protect(() async {
for (final req in _rq) {
if (!req.isComplete) {
req.completer.completeError(Exception(error), stackTrace);
}
}
_rq.clear();
});
}
Future<bool> get isEmpty async {
return await _lock.protect(() async {
return _rq.isEmpty;
});
}
}
class _JsonRPCRequest {
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
static const int separatorByte = 0x0A;
final String jsonRequest;
final Completer<JsonRPCResponse> completer;
final Duration requestTimeout;
final List<int> _responseData = [];
_JsonRPCRequest({
required this.jsonRequest,
required this.completer,
required this.requestTimeout,
});
void appendDataAndCheckIfComplete(List<int> data) {
_responseData.addAll(data);
if (data.last == separatorByte) {
try {
final response = json.decode(String.fromCharCodes(_responseData));
completer.complete(JsonRPCResponse(data: response));
} catch (e, s) {
Logging.instance.log(
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
);
completer.completeError(e, s);
}
}
}
void initiateTimeout({
VoidCallback? onTimedOut,
}) {
Future<void>.delayed(requestTimeout).then((_) {
if (!isComplete) {
try {
throw Exception("_JsonRPCRequest timed out: $jsonRequest");
} catch (e, s) {
completer.completeError(e, s);
onTimedOut?.call();
}
}
});
}
bool get isComplete => completer.isCompleted;
}
class JsonRPCResponse {
final dynamic data;
final Exception? exception;
JsonRPCResponse({this.data, this.exception});
}
bool isIpAddress(String host) {
try {
// if the string can be parsed into an InternetAddress, it's an IP.
InternetAddress(host);
return true;
} catch (e) {
// if parsing fails, it's not an IP.
return false;
}
}