diff --git a/lib/electrumx_rpc/subscribable_electrumx_client.dart b/lib/electrumx_rpc/subscribable_electrumx_client.dart index ca4d8bd9f..d7c6f0259 100644 --- a/lib/electrumx_rpc/subscribable_electrumx_client.dart +++ b/lib/electrumx_rpc/subscribable_electrumx_client.dart @@ -12,8 +12,13 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:socks_socket/socks_socket.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; +import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; +import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; +import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/utilities/logger.dart'; +import 'package:stackwallet/utilities/prefs.dart'; class ElectrumXSubscription { final StreamController _controller = @@ -40,6 +45,7 @@ class SubscribableElectrumXClient { final Map _tasks = {}; Timer? _aliveTimer; Socket? _socket; + SOCKSSocket? _socksSocket; late final bool _useSSL; late final Duration _connectionTimeout; late final Duration _keepAlive; @@ -49,6 +55,8 @@ class SubscribableElectrumXClient { void Function(bool)? onConnectionStatusChanged; + late Prefs _prefs; + late TorService _torService; SubscribableElectrumXClient({ bool useSSL = true, this.onConnectionStatusChanged, @@ -83,33 +91,51 @@ class SubscribableElectrumXClient { // return client; // } - Future connect({required String host, required int port}) async { + /// Connect to the server. + /// + /// If Tor is enabled, it will attempt to connect through Tor. + Future connect({ + required String host, + required int port, + }) async { try { await _socket?.close(); } catch (_) {} - if (_useSSL) { - try { - _socket = await SecureSocket.connect( - host, - port, - timeout: _connectionTimeout, - onBadCertificate: (_) => - true, // TODO do not automatically trust bad certificates. - ); - } catch (e, s) { - Logging.instance.log( - "Error connecting in SubscribableElectrumXClient" - "\nError: $e\nStack trace: $s", - level: LogLevel.Error); - } + if (!Prefs.instance.useTor) { + await connectClearnet(host, port); } else { - _socket = await Socket.connect( - host, - port, - timeout: _connectionTimeout, - ); + // If we're supposed to use Tor... + if (_torService.status != TorConnectionStatus.connected) { + // ... but Tor isn't running... + if (!_prefs.torKillSwitch) { + // ... and the killswitch isn't set, then we'll connect clearnet. + Logging.instance.log( + "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", + level: LogLevel.Warning, + ); + await connectClearnet(host, port); + } else { + // ... but if the killswitch is set, then let's try to start Tor. + await _torService.start(); + // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. + + // Doublecheck that Tor is running. + if (_torService.status != TorConnectionStatus.connected) { + // If Tor still isn't running, then we'll throw an exception. + throw Exception( + "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); + } + + // Connect via Tor. + await connectTor(host, port); + } + } else { + // Connect via Tor. + await connectTor(host, port); + } } + _updateConnectionStatus(true); _socket!.listen( @@ -126,12 +152,127 @@ class SubscribableElectrumXClient { ); } + /// Connect to the server directly. + Future connectClearnet(String host, int port) async { + try { + Logging.instance.log( + "SubscribableElectrumXClient.connectClearnet(): " + "creating a socket to $host:$port (SSL $useSSL)...", + level: LogLevel.Info); + + if (_useSSL) { + _socket = await SecureSocket.connect( + host, + port, + timeout: _connectionTimeout, + onBadCertificate: (_) => + true, // TODO do not automatically trust bad certificates. + ); + } else { + _socket = await Socket.connect( + host, + port, + timeout: _connectionTimeout, + ); + } + + Logging.instance.log( + "SubscribableElectrumXClient.connectClearnet(): " + "created socket to $host:$port...", + level: LogLevel.Info); + } catch (e, s) { + final String msg = "SubscribableElectrumXClient.connectClearnet: " + "failed to connect to $host (SSL: $useSSL)." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } + + return; + } + + /// Connect to the server using the Tor service. + Future connectTor(String host, int port) async { + // Get the proxy info from the TorService. + final proxyInfo = _torService.getProxyInfo(); + + try { + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", + level: LogLevel.Info); + + // Create a socks socket using the Tor service's proxy info. + _socksSocket = await SOCKSSocket.create( + proxyHost: proxyInfo.host.address, + proxyPort: proxyInfo.port, + sslEnabled: useSSL, + ); + + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "created SOCKS socket at $proxyInfo...", + level: LogLevel.Info); + } catch (e, s) { + final String msg = "SubscribableElectrumXClient.connectTor(): " + "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } + + try { + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", + level: LogLevel.Info); + + await _socksSocket?.connect(); + + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "connected to SOCKS socket at $proxyInfo...", + level: LogLevel.Info); + } catch (e, s) { + final String msg = "SubscribableElectrumXClient.connectTor(): " + "failed to connect to SOCKS socket at $proxyInfo.." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } + + try { + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "connecting to $host:$port over SOCKS socket at $proxyInfo...", + level: LogLevel.Info); + + await _socksSocket?.connectTo(host, port); + + Logging.instance.log( + "SubscribableElectrumXClient.connectTor(): " + "connected to $host:$port over SOCKS socket at $proxyInfo", + level: LogLevel.Info); + } catch (e, s) { + final String msg = "SubscribableElectrumXClient.connectTor(): " + "failed to connect $host over tor proxy at $proxyInfo." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } + + return; + } + + /// Disconnect from the server. Future disconnect() async { _aliveTimer?.cancel(); await _socket?.close(); + await _socksSocket?.close(); onConnectionStatusChanged = null; } + /// Format JSON request string. String _buildJsonRequestString({ required String method, required String id, @@ -254,19 +395,39 @@ class SubscribableElectrumXClient { final completer = Completer(); _currentRequestID++; final id = _currentRequestID.toString(); - _addTask(id: id, completer: completer); - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); + try { + _addTask(id: id, completer: completer); - return completer.future; + if (_prefs.useTor) { + _socksSocket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), + ); + } else { + _socket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), + ); + } + + return completer.future; + } catch (e, s) { + final String msg = "SubscribableElectrumXClient._call: " + "failed to request $method with id $id." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } } + /// Write call to socket with timeout. Future _callWithTimeout({ required String method, List params = const [], @@ -275,49 +436,82 @@ class SubscribableElectrumXClient { final completer = Completer(); _currentRequestID++; final id = _currentRequestID.toString(); - _addTask(id: id, completer: completer); - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); + try { + _addTask(id: id, completer: completer); - Timer(timeout, () { - if (!completer.isCompleted) { - completer.completeError( - Exception("Request \"id: $id, method: $method\" timed out!"), + if (_prefs.useTor) { + _socksSocket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), + ); + } else { + _socket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), ); } - }); - return completer.future; + Timer(timeout, () { + if (!completer.isCompleted) { + completer.completeError( + Exception("Request \"id: $id, method: $method\" timed out!"), + ); + } + }); + + return completer.future; + } catch (e, s) { + final String msg = "SubscribableElectrumXClient._callWithTimeout: " + "failed to request $method with id $id (timeout $timeout)." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } } ElectrumXSubscription _subscribe({ - required String taskId, + required String id, required String method, List params = const [], }) { - // try { - final subscription = ElectrumXSubscription(); - _addSubscriptionTask(id: taskId, subscription: subscription); - _currentRequestID++; - _socket?.write( - _buildJsonRequestString( - method: method, - id: taskId, - params: params, - ), - ); + try { + final subscription = ElectrumXSubscription(); + _addSubscriptionTask(id: id, subscription: subscription); + _currentRequestID++; - return subscription; - // } catch (e, s) { - // Logging.instance.log("SubscribableElectrumXClient _subscribe: $e\n$s", level: LogLevel.Error); - // return null; - // } + if (_prefs.useTor) { + _socksSocket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), + ); + } else { + _socket?.write( + _buildJsonRequestString( + method: method, + id: id, + params: params, + ), + ); + } + + return subscription; + } catch (e, s) { + final String msg = "SubscribableElectrumXClient._subscribe: " + "failed to subscribe to $method with id $id." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw JsonRpcException(msg); + } } /// Ping the server to ensure it is responding @@ -335,7 +529,7 @@ class SubscribableElectrumXClient { /// Subscribe to a scripthash to receive notifications on status changes ElectrumXSubscription subscribeToScripthash({required String scripthash}) { return _subscribe( - taskId: 'blockchain.scripthash.subscribe:$scripthash', + id: 'blockchain.scripthash.subscribe:$scripthash', method: 'blockchain.scripthash.subscribe', params: [scripthash], ); @@ -347,7 +541,7 @@ class SubscribableElectrumXClient { ElectrumXSubscription subscribeToBlockHeaders() { return _tasks["blockchain.headers.subscribe"]?.subscription ?? _subscribe( - taskId: "blockchain.headers.subscribe", + id: "blockchain.headers.subscribe", method: "blockchain.headers.subscribe", params: [], );