use Tor in subscribable client where applicable

This commit is contained in:
sneurlax 2024-02-05 14:09:13 -06:00
parent 0d5a8f25a1
commit 5835b1e4a7

View file

@ -12,8 +12,13 @@ import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:socks_socket/socks_socket.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/tor_service.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart';
class ElectrumXSubscription {
final StreamController<dynamic> _controller =
@ -40,6 +45,7 @@ class SubscribableElectrumXClient {
final Map<String, SocketTask> _tasks = {};
Timer? _aliveTimer;
Socket? _socket;
SOCKSSocket? _socksSocket;
late final bool _useSSL;
late final Duration _connectionTimeout;
late final Duration _keepAlive;
@ -49,6 +55,8 @@ class SubscribableElectrumXClient {
void Function(bool)? onConnectionStatusChanged;
late Prefs _prefs;
late TorService _torService;
SubscribableElectrumXClient({
bool useSSL = true,
this.onConnectionStatusChanged,
@ -83,33 +91,51 @@ class SubscribableElectrumXClient {
// return client;
// }
Future<void> connect({required String host, required int port}) async {
/// Connect to the server.
///
/// If Tor is enabled, it will attempt to connect through Tor.
Future<void> connect({
required String host,
required int port,
}) async {
try {
await _socket?.close();
} catch (_) {}
if (_useSSL) {
try {
_socket = await SecureSocket.connect(
host,
port,
timeout: _connectionTimeout,
onBadCertificate: (_) =>
true, // TODO do not automatically trust bad certificates.
);
} catch (e, s) {
Logging.instance.log(
"Error connecting in SubscribableElectrumXClient"
"\nError: $e\nStack trace: $s",
level: LogLevel.Error);
}
if (!Prefs.instance.useTor) {
await connectClearnet(host, port);
} else {
_socket = await Socket.connect(
host,
port,
timeout: _connectionTimeout,
);
// If we're supposed to use Tor...
if (_torService.status != TorConnectionStatus.connected) {
// ... but Tor isn't running...
if (!_prefs.torKillSwitch) {
// ... and the killswitch isn't set, then we'll connect clearnet.
Logging.instance.log(
"Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet",
level: LogLevel.Warning,
);
await connectClearnet(host, port);
} else {
// ... but if the killswitch is set, then let's try to start Tor.
await _torService.start();
// TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature.
// Doublecheck that Tor is running.
if (_torService.status != TorConnectionStatus.connected) {
// If Tor still isn't running, then we'll throw an exception.
throw Exception(
"Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX");
}
// Connect via Tor.
await connectTor(host, port);
}
} else {
// Connect via Tor.
await connectTor(host, port);
}
}
_updateConnectionStatus(true);
_socket!.listen(
@ -126,12 +152,127 @@ class SubscribableElectrumXClient {
);
}
/// Connect to the server directly.
Future<void> connectClearnet(String host, int port) async {
try {
Logging.instance.log(
"SubscribableElectrumXClient.connectClearnet(): "
"creating a socket to $host:$port (SSL $useSSL)...",
level: LogLevel.Info);
if (_useSSL) {
_socket = await SecureSocket.connect(
host,
port,
timeout: _connectionTimeout,
onBadCertificate: (_) =>
true, // TODO do not automatically trust bad certificates.
);
} else {
_socket = await Socket.connect(
host,
port,
timeout: _connectionTimeout,
);
}
Logging.instance.log(
"SubscribableElectrumXClient.connectClearnet(): "
"created socket to $host:$port...",
level: LogLevel.Info);
} catch (e, s) {
final String msg = "SubscribableElectrumXClient.connectClearnet: "
"failed to connect to $host (SSL: $useSSL)."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
return;
}
/// Connect to the server using the Tor service.
Future<void> connectTor(String host, int port) async {
// Get the proxy info from the TorService.
final proxyInfo = _torService.getProxyInfo();
try {
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"creating a SOCKS socket at $proxyInfo (SSL $useSSL)...",
level: LogLevel.Info);
// Create a socks socket using the Tor service's proxy info.
_socksSocket = await SOCKSSocket.create(
proxyHost: proxyInfo.host.address,
proxyPort: proxyInfo.port,
sslEnabled: useSSL,
);
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"created SOCKS socket at $proxyInfo...",
level: LogLevel.Info);
} catch (e, s) {
final String msg = "SubscribableElectrumXClient.connectTor(): "
"failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
try {
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...",
level: LogLevel.Info);
await _socksSocket?.connect();
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"connected to SOCKS socket at $proxyInfo...",
level: LogLevel.Info);
} catch (e, s) {
final String msg = "SubscribableElectrumXClient.connectTor(): "
"failed to connect to SOCKS socket at $proxyInfo.."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
try {
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"connecting to $host:$port over SOCKS socket at $proxyInfo...",
level: LogLevel.Info);
await _socksSocket?.connectTo(host, port);
Logging.instance.log(
"SubscribableElectrumXClient.connectTor(): "
"connected to $host:$port over SOCKS socket at $proxyInfo",
level: LogLevel.Info);
} catch (e, s) {
final String msg = "SubscribableElectrumXClient.connectTor(): "
"failed to connect $host over tor proxy at $proxyInfo."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
return;
}
/// Disconnect from the server.
Future<void> disconnect() async {
_aliveTimer?.cancel();
await _socket?.close();
await _socksSocket?.close();
onConnectionStatusChanged = null;
}
/// Format JSON request string.
String _buildJsonRequestString({
required String method,
required String id,
@ -254,19 +395,39 @@ class SubscribableElectrumXClient {
final completer = Completer<dynamic>();
_currentRequestID++;
final id = _currentRequestID.toString();
_addTask(id: id, completer: completer);
_socket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
try {
_addTask(id: id, completer: completer);
return completer.future;
if (_prefs.useTor) {
_socksSocket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
} else {
_socket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
}
return completer.future;
} catch (e, s) {
final String msg = "SubscribableElectrumXClient._call: "
"failed to request $method with id $id."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
}
/// Write call to socket with timeout.
Future<dynamic> _callWithTimeout({
required String method,
List<dynamic> params = const [],
@ -275,49 +436,82 @@ class SubscribableElectrumXClient {
final completer = Completer<dynamic>();
_currentRequestID++;
final id = _currentRequestID.toString();
_addTask(id: id, completer: completer);
_socket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
try {
_addTask(id: id, completer: completer);
Timer(timeout, () {
if (!completer.isCompleted) {
completer.completeError(
Exception("Request \"id: $id, method: $method\" timed out!"),
if (_prefs.useTor) {
_socksSocket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
} else {
_socket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
}
});
return completer.future;
Timer(timeout, () {
if (!completer.isCompleted) {
completer.completeError(
Exception("Request \"id: $id, method: $method\" timed out!"),
);
}
});
return completer.future;
} catch (e, s) {
final String msg = "SubscribableElectrumXClient._callWithTimeout: "
"failed to request $method with id $id (timeout $timeout)."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
}
ElectrumXSubscription _subscribe({
required String taskId,
required String id,
required String method,
List<dynamic> params = const [],
}) {
// try {
final subscription = ElectrumXSubscription();
_addSubscriptionTask(id: taskId, subscription: subscription);
_currentRequestID++;
_socket?.write(
_buildJsonRequestString(
method: method,
id: taskId,
params: params,
),
);
try {
final subscription = ElectrumXSubscription();
_addSubscriptionTask(id: id, subscription: subscription);
_currentRequestID++;
return subscription;
// } catch (e, s) {
// Logging.instance.log("SubscribableElectrumXClient _subscribe: $e\n$s", level: LogLevel.Error);
// return null;
// }
if (_prefs.useTor) {
_socksSocket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
} else {
_socket?.write(
_buildJsonRequestString(
method: method,
id: id,
params: params,
),
);
}
return subscription;
} catch (e, s) {
final String msg = "SubscribableElectrumXClient._subscribe: "
"failed to subscribe to $method with id $id."
"\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw JsonRpcException(msg);
}
}
/// Ping the server to ensure it is responding
@ -335,7 +529,7 @@ class SubscribableElectrumXClient {
/// Subscribe to a scripthash to receive notifications on status changes
ElectrumXSubscription subscribeToScripthash({required String scripthash}) {
return _subscribe(
taskId: 'blockchain.scripthash.subscribe:$scripthash',
id: 'blockchain.scripthash.subscribe:$scripthash',
method: 'blockchain.scripthash.subscribe',
params: [scripthash],
);
@ -347,7 +541,7 @@ class SubscribableElectrumXClient {
ElectrumXSubscription subscribeToBlockHeaders() {
return _tasks["blockchain.headers.subscribe"]?.subscription ??
_subscribe(
taskId: "blockchain.headers.subscribe",
id: "blockchain.headers.subscribe",
method: "blockchain.headers.subscribe",
params: [],
);