listen to tor status changes in electrumx

This commit is contained in:
julian 2023-09-07 12:07:05 -06:00
parent 6e625e2c74
commit d4e0f3b045
4 changed files with 169 additions and 146 deletions

View file

@ -8,14 +8,18 @@
*
*/
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:decimal/decimal.dart';
import 'package:event_bus/event_bus.dart';
import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart';
import 'package:stackwallet/networking/tor_service.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart';
import 'package:uuid/uuid.dart';
@ -66,13 +70,14 @@ class ElectrumX {
JsonRPC? _rpcClient;
late Prefs _prefs;
late TorService _torService;
List<ElectrumXNode>? failovers;
int currentFailoverIndex = -1;
final Duration connectionTimeoutForSpecialCaseJsonRPCClients;
({String host, int port})? proxyInfo;
StreamSubscription<TorStatusChangedEvent>? _torStatusListener;
ElectrumX({
required String host,
@ -83,49 +88,60 @@ class ElectrumX {
JsonRPC? client,
this.connectionTimeoutForSpecialCaseJsonRPCClients =
const Duration(seconds: 60),
({String host, int port})? proxyInfo,
TorService? torService,
EventBus? globalEventBusForTesting,
}) {
_prefs = prefs;
_torService = torService ?? TorService.sharedInstance;
_host = host;
_port = port;
_useSSL = useSSL;
_rpcClient = client;
final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
_torStatusListener = bus.on<TorStatusChangedEvent>().listen(
(event) async {
// not sure if we need to do anything specific here
// switch (event.status) {
// case TorStatus.enabled:
// case TorStatus.disabled:
// }
// might be ok to just reset/kill the current _jsonRpcClient
// since disconnecting is async and we want to ensure instant change over
// we will keep temp reference to current rpc client to call disconnect
// on before awaiting the disconnection future
final temp = _rpcClient;
// setting to null should force the creation of a new json rpc client
// on the next request sent through this electrumx instance
_rpcClient = null;
await temp?.disconnect(
reason: "Tor status changed to \"${event.status}\"",
);
},
);
}
factory ElectrumX.from({
required ElectrumXNode node,
required Prefs prefs,
required List<ElectrumXNode> failovers,
({String host, int port})? proxyInfo,
TorService? torService,
EventBus? globalEventBusForTesting,
}) {
if (Prefs.instance.useTor) {
if (proxyInfo == null) {
// TODO await tor / make sure it's running
proxyInfo = (
host: InternetAddress.loopbackIPv4.address,
port: TorService.sharedInstance.port
);
Logging.instance.log("ElectrumX.from(): tor detected at $proxyInfo",
level: LogLevel.Warning);
}
return ElectrumX(
host: node.address,
port: node.port,
useSSL: node.useSSL,
prefs: prefs,
failovers: failovers,
proxyInfo: proxyInfo,
);
} else {
return ElectrumX(
host: node.address,
port: node.port,
useSSL: node.useSSL,
prefs: prefs,
failovers: failovers,
proxyInfo: null,
);
}
return ElectrumX(
host: node.address,
port: node.port,
useSSL: node.useSSL,
prefs: prefs,
torService: torService,
failovers: failovers,
globalEventBusForTesting: globalEventBusForTesting,
);
}
Future<bool> _allow() async {
@ -136,6 +152,59 @@ class ElectrumX {
return true;
}
void _checkRpcClient() {
if (_prefs.useTor) {
if (!_torService.enabled) {
throw Exception("Tor is not enabled");
}
final proxyInfo = _torService.proxyInfo;
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
} else {
_rpcClient ??= JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
}
if (_rpcClient!.proxyInfo != proxyInfo) {
_rpcClient!.proxyInfo = proxyInfo;
_rpcClient!.disconnect(
reason: "Tor proxyInfo does not match current info",
);
}
} else {
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
} else {
_rpcClient ??= JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
}
}
}
/// Send raw rpc command
Future<dynamic> request({
required String command,
@ -148,52 +217,7 @@ class ElectrumX {
throw WifiOnlyException();
}
if (Prefs.instance.useTor) {
if (proxyInfo == null) {
// TODO await tor / make sure Tor is running
proxyInfo = (
host: InternetAddress.loopbackIPv4.address,
port: TorService.sharedInstance.port
);
Logging.instance.log("ElectrumX.request(): tor detected at $proxyInfo",
level: LogLevel.Warning);
}
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
} else {
_rpcClient ??= JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
}
} else {
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
} else {
_rpcClient ??= JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
}
}
_checkRpcClient();
try {
final requestId = requestID ?? const Uuid().v1();
@ -280,54 +304,7 @@ class ElectrumX {
throw WifiOnlyException();
}
if (Prefs.instance.useTor) {
// TODO await tor / make sure Tor is initialized
if (proxyInfo == null) {
proxyInfo = (
host: InternetAddress.loopbackIPv4.address,
port: TorService.sharedInstance.port
);
Logging.instance.log(
"ElectrumX.batchRequest(): tor detected at $proxyInfo",
level: LogLevel.Warning);
}
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
} else {
_rpcClient = JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: proxyInfo,
);
}
} else {
if (currentFailoverIndex == -1) {
_rpcClient ??= JsonRPC(
host: host,
port: port,
useSSL: useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
} else {
_rpcClient = JsonRPC(
host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port,
useSSL: failovers![currentFailoverIndex].useSSL,
connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients,
proxyInfo: null,
);
}
}
_checkRpcClient();
try {
final List<String> requestStrings = [];

View file

@ -26,13 +26,13 @@ class JsonRPC {
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
required ({String host, int port})? proxyInfo,
required ({InternetAddress host, int port})? proxyInfo,
});
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
({String host, int port})? proxyInfo;
({InternetAddress host, int port})? proxyInfo;
final _requestMutex = Mutex();
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
@ -195,19 +195,17 @@ class JsonRPC {
);
} else {
if (proxyInfo == null) {
// TODO await tor / make sure it's running
proxyInfo = (
host: InternetAddress.loopbackIPv4.address,
port: TorService.sharedInstance.port
proxyInfo = TorService.sharedInstance.proxyInfo;
Logging.instance.log(
"ElectrumX.connect(): tor detected at $proxyInfo",
level: LogLevel.Warning,
);
Logging.instance.log("ElectrumX.connect(): tor detected at $proxyInfo",
level: LogLevel.Warning);
}
// instantiate a socks socket at localhost and on the port selected by the tor service
_socksSocket = await SOCKSSocket.create(
proxyHost: InternetAddress.loopbackIPv4.address,
proxyPort: TorService.sharedInstance.port,
proxyHost: proxyInfo!.host.address,
proxyPort: proxyInfo!.port,
sslEnabled: useSSL,
);

View file

@ -18,8 +18,8 @@ abstract class HTTP {
if (routeOverTor) {
SocksTCPClient.assignToHttpClient(httpClient, [
ProxySettings(
InternetAddress.loopbackIPv4,
TorService.sharedInstance.port,
TorService.sharedInstance.proxyInfo.host,
TorService.sharedInstance.proxyInfo.port,
),
]);
}
@ -56,8 +56,8 @@ abstract class HTTP {
if (routeOverTor) {
SocksTCPClient.assignToHttpClient(httpClient, [
ProxySettings(
InternetAddress.loopbackIPv4,
TorService.sharedInstance.port,
TorService.sharedInstance.proxyInfo.host,
TorService.sharedInstance.proxyInfo.port,
),
]);
}

View file

@ -1,22 +1,70 @@
import 'dart:io';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:stackwallet/utilities/stack_file_system.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:tor/tor.dart';
final pTorService = Provider((_) => TorService.sharedInstance);
class TorService {
static final sharedInstance = TorService();
final _tor = Tor();
bool _enabled = false;
int get port => _tor.port;
TorService._();
static final sharedInstance = TorService._();
({
InternetAddress host,
int port,
}) get proxyInfo => (
host: InternetAddress.loopbackIPv4,
port: _tor.port,
);
bool get enabled => _enabled;
Future<void> start() async {
final dir = await StackFileSystem.applicationTorDirectory();
await _tor.start();
return;
if (_enabled) {
// already started so just return
// could throw an exception here or something so the caller
// is explicitly made aware of this
return;
}
try {
await _tor.start();
// no exception or error so we can (probably?) assume tor
// has started successfully
_enabled = true;
} catch (e, s) {
Logging.instance.log(
"TorService.start failed: $e\n$s",
level: LogLevel.Warning,
);
rethrow;
}
}
Future<void> stop() async {
return await _tor.disable();
if (!_enabled) {
// already stopped so just return
// could throw an exception here or something so the caller
// is explicitly made aware of this
return;
}
try {
await _tor.disable();
// no exception or error so we can (probably?) assume tor
// has started successfully
_enabled = false;
} catch (e, s) {
Logging.instance.log(
"TorService.stop failed: $e\n$s",
level: LogLevel.Warning,
);
rethrow;
}
}
}