tor connecting mutex

This commit is contained in:
julian 2023-09-08 10:37:50 -06:00
parent 5fcd03413f
commit 696b8bc8c3

View file

@ -15,8 +15,10 @@ import 'dart:io';
import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:decimal/decimal.dart'; import 'package:decimal/decimal.dart';
import 'package:event_bus/event_bus.dart'; import 'package:event_bus/event_bus.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart'; import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/services/tor_service.dart';
@ -80,9 +82,15 @@ class ElectrumX {
// add finalizer to cancel stream subscription when all references to an // add finalizer to cancel stream subscription when all references to an
// instance of ElectrumX becomes inaccessible // instance of ElectrumX becomes inaccessible
static final Finalizer<ElectrumX> _finalizer = Finalizer( static final Finalizer<ElectrumX> _finalizer = Finalizer(
(p0) => p0._torStatusListener?.cancel(), (p0) {
p0._torPreferenceListener?.cancel();
p0._torStatusListener?.cancel();
},
); );
StreamSubscription<TorPreferenceChangedEvent>? _torStatusListener; StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
final Mutex _torConnectingLock = Mutex();
ElectrumX({ ElectrumX({
required String host, required String host,
@ -104,7 +112,21 @@ class ElectrumX {
_rpcClient = client; _rpcClient = client;
final bus = globalEventBusForTesting ?? GlobalEventBus.instance; final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
_torStatusListener = bus.on<TorPreferenceChangedEvent>().listen( _torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async {
switch (event.newStatus) {
case TorConnectionStatus.connecting:
await _torConnectingLock.acquire();
break;
case TorConnectionStatus.connected:
case TorConnectionStatus.disconnected:
_torConnectingLock.release();
break;
}
},
);
_torPreferenceListener = bus.on<TorPreferenceChangedEvent>().listen(
(event) async { (event) async {
// not sure if we need to do anything specific here // not sure if we need to do anything specific here
// switch (event.status) { // switch (event.status) {
@ -238,7 +260,7 @@ class ElectrumX {
throw WifiOnlyException(); throw WifiOnlyException();
} }
_checkRpcClient(); await _torConnectingLock.protect(() async => _checkRpcClient());
try { try {
final requestId = requestID ?? const Uuid().v1(); final requestId = requestID ?? const Uuid().v1();
@ -324,7 +346,7 @@ class ElectrumX {
throw WifiOnlyException(); throw WifiOnlyException();
} }
_checkRpcClient(); await _torConnectingLock.protect(() async => _checkRpcClient());
try { try {
final List<String> requestStrings = []; final List<String> requestStrings = [];