listen to tor connection and preferences events

This commit is contained in:
sneurlax 2024-02-05 14:33:52 -06:00
parent 53d7143156
commit 9835970751

View file

@ -12,10 +12,14 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:io'; import 'dart:io';
import 'package:event_bus/event_bus.dart';
import 'package:mutex/mutex.dart';
import 'package:socks_socket/socks_socket.dart'; import 'package:socks_socket/socks_socket.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/services/tor_service.dart';
import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/prefs.dart'; import 'package:stackwallet/utilities/prefs.dart';
@ -57,6 +61,10 @@ class SubscribableElectrumXClient {
late Prefs _prefs; late Prefs _prefs;
late TorService _torService; late TorService _torService;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
final Mutex _torConnectingLock = Mutex();
bool _requireMutex = false;
SubscribableElectrumXClient({ SubscribableElectrumXClient({
required bool useSSL, required bool useSSL,
@ -65,6 +73,7 @@ class SubscribableElectrumXClient {
this.onConnectionStatusChanged, this.onConnectionStatusChanged,
Duration connectionTimeout = const Duration(seconds: 5), Duration connectionTimeout = const Duration(seconds: 5),
Duration keepAlive = const Duration(seconds: 10), Duration keepAlive = const Duration(seconds: 10),
EventBus? globalEventBusForTesting,
}) { }) {
_useSSL = useSSL; _useSSL = useSSL;
_prefs = prefs; _prefs = prefs;
@ -72,8 +81,48 @@ class SubscribableElectrumXClient {
_connectionTimeout = connectionTimeout; _connectionTimeout = connectionTimeout;
_keepAlive = keepAlive; _keepAlive = keepAlive;
// TODO [prio=high]: Listen for TorConnectionStatusChangedEvent. // If we're testing, use the global event bus for testing.
// TODO [prio=high]: Listen for TorPreferenceChangedEvent. final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
// Listen to global event bus for Tor status changes.
_torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async {
switch (event.newStatus) {
case TorConnectionStatus.connecting:
// If Tor is connecting, we need to wait.
await _torConnectingLock.acquire();
_requireMutex = true;
break;
case TorConnectionStatus.connected:
case TorConnectionStatus.disconnected:
// If Tor is connected or disconnected, we can release the lock.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
_requireMutex = false;
break;
}
},
);
// Listen to global event bus for Tor preference changes.
_torPreferenceListener = bus.on<TorPreferenceChangedEvent>().listen(
(event) async {
// Close open socket (if open).
final tempSocket = _socket;
_socket = null;
await tempSocket?.close();
// Close open SOCKS socket (if open).
final tempSOCKSSocket = _socksSocket;
_socksSocket = null;
await tempSOCKSSocket?.close();
// Clear subscriptions.
_tasks.clear();
},
);
} }
factory SubscribableElectrumXClient.from({ factory SubscribableElectrumXClient.from({
@ -109,11 +158,19 @@ class SubscribableElectrumXClient {
required String host, required String host,
required int port, required int port,
}) async { }) async {
// If we're already connected, disconnect first.
try { try {
await _socket?.close(); await _socket?.close();
} catch (_) {} } catch (_) {}
// If we're connecting to Tor, wait.
if (_requireMutex) {
// Just use a dummy function that waits for the lock to be released.
await _torConnectingLock.protect(() async {});
}
if (!Prefs.instance.useTor) { if (!Prefs.instance.useTor) {
// If we're not supposed to use Tor, then connect directly.
await connectClearnet(host, port); await connectClearnet(host, port);
} else { } else {
// If we're supposed to use Tor... // If we're supposed to use Tor...
@ -413,10 +470,34 @@ class SubscribableElectrumXClient {
required String method, required String method,
List<dynamic> params = const [], List<dynamic> params = const [],
}) async { }) async {
// If we're connecting to Tor, wait.
if (_requireMutex) {
// Just use a dummy function that waits for the lock to be released.
await _torConnectingLock.protect(() async {});
}
// Check socket is connected.
if (_prefs.useTor) {
if (_socksSocket == null) {
final msg = "SubscribableElectrumXClient._call: "
"SOCKSSocket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
} else {
if (_socket == null) {
final msg = "SubscribableElectrumXClient._call: "
"Socket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
}
final completer = Completer<dynamic>(); final completer = Completer<dynamic>();
_currentRequestID++; _currentRequestID++;
final id = _currentRequestID.toString(); final id = _currentRequestID.toString();
// Write to the socket.
try { try {
_addTask(id: id, completer: completer); _addTask(id: id, completer: completer);
@ -454,10 +535,34 @@ class SubscribableElectrumXClient {
List<dynamic> params = const [], List<dynamic> params = const [],
Duration timeout = const Duration(seconds: 2), Duration timeout = const Duration(seconds: 2),
}) async { }) async {
// If we're connecting to Tor, wait.
if (_requireMutex) {
// Just use a dummy function that waits for the lock to be released.
await _torConnectingLock.protect(() async {});
}
// Check socket is connected.
if (_prefs.useTor) {
if (_socksSocket == null) {
final msg = "SubscribableElectrumXClient._call: "
"SOCKSSocket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
} else {
if (_socket == null) {
final msg = "SubscribableElectrumXClient._call: "
"Socket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
}
final completer = Completer<dynamic>(); final completer = Completer<dynamic>();
_currentRequestID++; _currentRequestID++;
final id = _currentRequestID.toString(); final id = _currentRequestID.toString();
// Write to the socket.
try { try {
_addTask(id: id, completer: completer); _addTask(id: id, completer: completer);
@ -507,6 +612,24 @@ class SubscribableElectrumXClient {
_addSubscriptionTask(id: id, subscription: subscription); _addSubscriptionTask(id: id, subscription: subscription);
_currentRequestID++; _currentRequestID++;
// Check socket is connected.
if (_prefs.useTor) {
if (_socksSocket == null) {
final msg = "SubscribableElectrumXClient._call: "
"SOCKSSocket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
} else {
if (_socket == null) {
final msg = "SubscribableElectrumXClient._call: "
"Socket is not connected. Method $method, params $params.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
}
// Write to the socket.
if (_prefs.useTor) { if (_prefs.useTor) {
_socksSocket?.write( _socksSocket?.write(
_buildJsonRequestString( _buildJsonRequestString(
@ -539,6 +662,13 @@ class SubscribableElectrumXClient {
/// ///
/// Returns true if ping succeeded /// Returns true if ping succeeded
Future<bool> ping() async { Future<bool> ping() async {
// If we're connecting to Tor, wait.
if (_requireMutex) {
// Just use a dummy function that waits for the lock to be released.
await _torConnectingLock.protect(() async {});
}
// Write to the socket.
try { try {
final response = (await _callWithTimeout(method: "server.ping")) as Map; final response = (await _callWithTimeout(method: "server.ping")) as Map;
return response.keys.contains("result") && response["result"] == null; return response.keys.contains("result") && response["result"] == null;