From d4e0f3b04540fb51c24c2d5fefcbd2abe7f06730 Mon Sep 17 00:00:00 2001 From: julian Date: Thu, 7 Sep 2023 12:07:05 -0600 Subject: [PATCH] listen to tor status changes in electrumx --- lib/electrumx_rpc/electrumx.dart | 227 ++++++++++++++----------------- lib/electrumx_rpc/rpc.dart | 18 ++- lib/networking/http.dart | 8 +- lib/networking/tor_service.dart | 62 ++++++++- 4 files changed, 169 insertions(+), 146 deletions(-) diff --git a/lib/electrumx_rpc/electrumx.dart b/lib/electrumx_rpc/electrumx.dart index 674029fea..6e54c69ce 100644 --- a/lib/electrumx_rpc/electrumx.dart +++ b/lib/electrumx_rpc/electrumx.dart @@ -8,14 +8,18 @@ * */ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:decimal/decimal.dart'; +import 'package:event_bus/event_bus.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/networking/tor_service.dart'; +import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; +import 'package:stackwallet/services/event_bus/global_event_bus.dart'; import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/prefs.dart'; import 'package:uuid/uuid.dart'; @@ -66,13 +70,14 @@ class ElectrumX { JsonRPC? _rpcClient; late Prefs _prefs; + late TorService _torService; List? failovers; int currentFailoverIndex = -1; final Duration connectionTimeoutForSpecialCaseJsonRPCClients; - ({String host, int port})? proxyInfo; + StreamSubscription? _torStatusListener; ElectrumX({ required String host, @@ -83,49 +88,60 @@ class ElectrumX { JsonRPC? client, this.connectionTimeoutForSpecialCaseJsonRPCClients = const Duration(seconds: 60), - ({String host, int port})? proxyInfo, + TorService? torService, + EventBus? globalEventBusForTesting, }) { _prefs = prefs; + _torService = torService ?? TorService.sharedInstance; _host = host; _port = port; _useSSL = useSSL; _rpcClient = client; + + final bus = globalEventBusForTesting ?? GlobalEventBus.instance; + _torStatusListener = bus.on().listen( + (event) async { + // not sure if we need to do anything specific here + // switch (event.status) { + // case TorStatus.enabled: + // case TorStatus.disabled: + // } + + // might be ok to just reset/kill the current _jsonRpcClient + + // since disconnecting is async and we want to ensure instant change over + // we will keep temp reference to current rpc client to call disconnect + // on before awaiting the disconnection future + + final temp = _rpcClient; + + // setting to null should force the creation of a new json rpc client + // on the next request sent through this electrumx instance + _rpcClient = null; + + await temp?.disconnect( + reason: "Tor status changed to \"${event.status}\"", + ); + }, + ); } factory ElectrumX.from({ required ElectrumXNode node, required Prefs prefs, required List failovers, - ({String host, int port})? proxyInfo, + TorService? torService, + EventBus? globalEventBusForTesting, }) { - if (Prefs.instance.useTor) { - if (proxyInfo == null) { - // TODO await tor / make sure it's running - proxyInfo = ( - host: InternetAddress.loopbackIPv4.address, - port: TorService.sharedInstance.port - ); - Logging.instance.log("ElectrumX.from(): tor detected at $proxyInfo", - level: LogLevel.Warning); - } - return ElectrumX( - host: node.address, - port: node.port, - useSSL: node.useSSL, - prefs: prefs, - failovers: failovers, - proxyInfo: proxyInfo, - ); - } else { - return ElectrumX( - host: node.address, - port: node.port, - useSSL: node.useSSL, - prefs: prefs, - failovers: failovers, - proxyInfo: null, - ); - } + return ElectrumX( + host: node.address, + port: node.port, + useSSL: node.useSSL, + prefs: prefs, + torService: torService, + failovers: failovers, + globalEventBusForTesting: globalEventBusForTesting, + ); } Future _allow() async { @@ -136,6 +152,59 @@ class ElectrumX { return true; } + void _checkRpcClient() { + if (_prefs.useTor) { + if (!_torService.enabled) { + throw Exception("Tor is not enabled"); + } + + final proxyInfo = _torService.proxyInfo; + + if (currentFailoverIndex == -1) { + _rpcClient ??= JsonRPC( + host: host, + port: port, + useSSL: useSSL, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + proxyInfo: proxyInfo, + ); + } else { + _rpcClient ??= JsonRPC( + host: failovers![currentFailoverIndex].address, + port: failovers![currentFailoverIndex].port, + useSSL: failovers![currentFailoverIndex].useSSL, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + proxyInfo: proxyInfo, + ); + } + + if (_rpcClient!.proxyInfo != proxyInfo) { + _rpcClient!.proxyInfo = proxyInfo; + _rpcClient!.disconnect( + reason: "Tor proxyInfo does not match current info", + ); + } + } else { + if (currentFailoverIndex == -1) { + _rpcClient ??= JsonRPC( + host: host, + port: port, + useSSL: useSSL, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + proxyInfo: null, + ); + } else { + _rpcClient ??= JsonRPC( + host: failovers![currentFailoverIndex].address, + port: failovers![currentFailoverIndex].port, + useSSL: failovers![currentFailoverIndex].useSSL, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + proxyInfo: null, + ); + } + } + } + /// Send raw rpc command Future request({ required String command, @@ -148,52 +217,7 @@ class ElectrumX { throw WifiOnlyException(); } - if (Prefs.instance.useTor) { - if (proxyInfo == null) { - // TODO await tor / make sure Tor is running - proxyInfo = ( - host: InternetAddress.loopbackIPv4.address, - port: TorService.sharedInstance.port - ); - Logging.instance.log("ElectrumX.request(): tor detected at $proxyInfo", - level: LogLevel.Warning); - } - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } else { - _rpcClient ??= JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } - } else { - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); - } else { - _rpcClient ??= JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); - } - } + _checkRpcClient(); try { final requestId = requestID ?? const Uuid().v1(); @@ -280,54 +304,7 @@ class ElectrumX { throw WifiOnlyException(); } - if (Prefs.instance.useTor) { - // TODO await tor / make sure Tor is initialized - if (proxyInfo == null) { - proxyInfo = ( - host: InternetAddress.loopbackIPv4.address, - port: TorService.sharedInstance.port - ); - Logging.instance.log( - "ElectrumX.batchRequest(): tor detected at $proxyInfo", - level: LogLevel.Warning); - } - - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } else { - _rpcClient = JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } - } else { - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); - } else { - _rpcClient = JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); - } - } + _checkRpcClient(); try { final List requestStrings = []; diff --git a/lib/electrumx_rpc/rpc.dart b/lib/electrumx_rpc/rpc.dart index 106b5a21b..a6335bf9e 100644 --- a/lib/electrumx_rpc/rpc.dart +++ b/lib/electrumx_rpc/rpc.dart @@ -26,13 +26,13 @@ class JsonRPC { required this.port, this.useSSL = false, this.connectionTimeout = const Duration(seconds: 60), - required ({String host, int port})? proxyInfo, + required ({InternetAddress host, int port})? proxyInfo, }); final bool useSSL; final String host; final int port; final Duration connectionTimeout; - ({String host, int port})? proxyInfo; + ({InternetAddress host, int port})? proxyInfo; final _requestMutex = Mutex(); final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue(); @@ -195,19 +195,17 @@ class JsonRPC { ); } else { if (proxyInfo == null) { - // TODO await tor / make sure it's running - proxyInfo = ( - host: InternetAddress.loopbackIPv4.address, - port: TorService.sharedInstance.port + proxyInfo = TorService.sharedInstance.proxyInfo; + Logging.instance.log( + "ElectrumX.connect(): tor detected at $proxyInfo", + level: LogLevel.Warning, ); - Logging.instance.log("ElectrumX.connect(): tor detected at $proxyInfo", - level: LogLevel.Warning); } // instantiate a socks socket at localhost and on the port selected by the tor service _socksSocket = await SOCKSSocket.create( - proxyHost: InternetAddress.loopbackIPv4.address, - proxyPort: TorService.sharedInstance.port, + proxyHost: proxyInfo!.host.address, + proxyPort: proxyInfo!.port, sslEnabled: useSSL, ); diff --git a/lib/networking/http.dart b/lib/networking/http.dart index 7203c91e2..5ab41a5d8 100644 --- a/lib/networking/http.dart +++ b/lib/networking/http.dart @@ -18,8 +18,8 @@ abstract class HTTP { if (routeOverTor) { SocksTCPClient.assignToHttpClient(httpClient, [ ProxySettings( - InternetAddress.loopbackIPv4, - TorService.sharedInstance.port, + TorService.sharedInstance.proxyInfo.host, + TorService.sharedInstance.proxyInfo.port, ), ]); } @@ -56,8 +56,8 @@ abstract class HTTP { if (routeOverTor) { SocksTCPClient.assignToHttpClient(httpClient, [ ProxySettings( - InternetAddress.loopbackIPv4, - TorService.sharedInstance.port, + TorService.sharedInstance.proxyInfo.host, + TorService.sharedInstance.proxyInfo.port, ), ]); } diff --git a/lib/networking/tor_service.dart b/lib/networking/tor_service.dart index 35df3509c..02d520db6 100644 --- a/lib/networking/tor_service.dart +++ b/lib/networking/tor_service.dart @@ -1,22 +1,70 @@ +import 'dart:io'; + import 'package:flutter_riverpod/flutter_riverpod.dart'; -import 'package:stackwallet/utilities/stack_file_system.dart'; +import 'package:stackwallet/utilities/logger.dart'; import 'package:tor/tor.dart'; final pTorService = Provider((_) => TorService.sharedInstance); class TorService { - static final sharedInstance = TorService(); final _tor = Tor(); + bool _enabled = false; - int get port => _tor.port; + TorService._(); + + static final sharedInstance = TorService._(); + + ({ + InternetAddress host, + int port, + }) get proxyInfo => ( + host: InternetAddress.loopbackIPv4, + port: _tor.port, + ); + + bool get enabled => _enabled; Future start() async { - final dir = await StackFileSystem.applicationTorDirectory(); - await _tor.start(); - return; + if (_enabled) { + // already started so just return + // could throw an exception here or something so the caller + // is explicitly made aware of this + return; + } + + try { + await _tor.start(); + // no exception or error so we can (probably?) assume tor + // has started successfully + _enabled = true; + } catch (e, s) { + Logging.instance.log( + "TorService.start failed: $e\n$s", + level: LogLevel.Warning, + ); + rethrow; + } } Future stop() async { - return await _tor.disable(); + if (!_enabled) { + // already stopped so just return + // could throw an exception here or something so the caller + // is explicitly made aware of this + return; + } + + try { + await _tor.disable(); + // no exception or error so we can (probably?) assume tor + // has started successfully + _enabled = false; + } catch (e, s) { + Logging.instance.log( + "TorService.stop failed: $e\n$s", + level: LogLevel.Warning, + ); + rethrow; + } } }